This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new dd6ed36 Add Python API implementation of workflows-as-code (#6269)
dd6ed36 is described below
commit dd6ed36f65d2add3bf8e31cad24ab25f4606c9d9
Author: Jiajie Zhong <[email protected]>
AuthorDate: Sun Oct 31 20:35:46 2021 +0800
Add Python API implementation of workflows-as-code (#6269)
* Init DS python SDK pydolphinscheduler: python code definition
* Doc first
* Add quick start and developer doc
* Java documentation change
* Add LICENSE-py4j.txt
* Add py4j to release-docs/LICENSE
* Move dependency version to parent pom
* Remove outdated code
* Add tenant parameter to tutorial
---
.gitignore | 5 +
.../api/service/ProjectService.java | 9 +
.../dolphinscheduler/api/service/QueueService.java | 8 +
.../api/service/TenantService.java | 16 ++
.../api/service/impl/ProjectServiceImpl.java | 15 +
.../api/service/impl/QueueServiceImpl.java | 26 ++
.../api/service/impl/TenantServiceImpl.java | 19 +-
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../dolphinscheduler/dao/mapper/QueueMapper.java | 7 +
.../dolphinscheduler/dao/mapper/QueueMapper.xml | 9 +
dolphinscheduler-dist/release-docs/LICENSE | 2 +-
.../release-docs/licenses/LICENSE-py4j.txt | 26 ++
dolphinscheduler-python/pom.xml | 60 ++++
.../pydolphinscheduler/README.md | 103 +++++++
.../pydolphinscheduler/ROADMAP.md | 34 +++
.../pydolphinscheduler/examples/tutorial.py | 43 +++
.../pydolphinscheduler/requirements.txt | 18 ++
.../pydolphinscheduler/requirements_dev.txt | 24 ++
.../pydolphinscheduler/setup.cfg | 16 ++
.../pydolphinscheduler/setup.py | 90 ++++++
.../pydolphinscheduler/src/__init__.py | 16 ++
.../src/pydolphinscheduler/__init__.py | 16 ++
.../src/pydolphinscheduler/constants.py | 74 +++++
.../src/pydolphinscheduler/core/__init__.py | 16 ++
.../src/pydolphinscheduler/core/base.py | 72 +++++
.../src/pydolphinscheduler/core/base_side.py | 43 +++
.../pydolphinscheduler/core/process_definition.py | 249 +++++++++++++++++
.../src/pydolphinscheduler/core/task.py | 237 ++++++++++++++++
.../src/pydolphinscheduler/java_gateway.py | 43 +++
.../src/pydolphinscheduler/side/__init__.py | 20 ++
.../src/pydolphinscheduler/side/project.py | 45 +++
.../src/pydolphinscheduler/side/queue.py | 44 +++
.../src/pydolphinscheduler/side/tenant.py | 45 +++
.../src/pydolphinscheduler/side/user.py | 68 +++++
.../src/pydolphinscheduler/side/worker_group.py | 35 +++
.../src/pydolphinscheduler/tasks/__init__.py | 16 ++
.../src/pydolphinscheduler/tasks/shell.py | 34 +++
.../src/pydolphinscheduler/utils/__init__.py | 16 ++
.../src/pydolphinscheduler/utils/string.py | 30 ++
.../pydolphinscheduler/test/__init__.py | 16 ++
.../pydolphinscheduler/test/core/__init__.py | 16 ++
.../test/core/test_process_definition.py | 118 ++++++++
.../pydolphinscheduler/test/core/test_task.py | 96 +++++++
.../pydolphinscheduler/test/example/__init__.py | 16 ++
.../pydolphinscheduler/test/tasks/__init__.py | 16 ++
.../pydolphinscheduler/test/tasks/test_shell.py | 61 ++++
.../pydolphinscheduler/test/test_java_gateway.py | 46 +++
.../server/PythonGatewayServer.java | 310 +++++++++++++++++++++
dolphinscheduler-standalone-server/pom.xml | 4 +
.../dolphinscheduler/server/StandaloneServer.java | 3 +-
pom.xml | 13 +
tools/dependencies/known-dependencies.txt | 1 +
52 files changed, 2363 insertions(+), 3 deletions(-)
diff --git a/.gitignore b/.gitignore
index 9011db1..3dccfdf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -46,3 +46,8 @@ dolphinscheduler-server/src/main/resources/logback.xml
dolphinscheduler-ui/dist
dolphinscheduler-ui/node
docker/build/apache-dolphinscheduler*
+
+# pydolphinscheduler
+__pycache__/
+build/
+*egg-info/
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
index dffa866..df05dee 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
@@ -47,6 +47,15 @@ public interface ProjectService {
Map<String, Object> queryByCode(User loginUser, long projectCode);
/**
+ * query project details by name
+ *
+ * @param loginUser login user
+ * @param projectName project name
+ * @return project detail information
+ */
+ Map<String, Object> queryByName(User loginUser, String projectName);
+
+ /**
* check project and authorization
*
* @param loginUser login user
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
index 7013520..f978b96 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java
@@ -76,4 +76,12 @@ public interface QueueService {
*/
Result<Object> verifyQueue(String queue, String queueName);
+ /**
+ * query queue by queueName
+ *
+ * @param queueName queue name
+ * @return queue object for provide queue name
+ */
+ Map<String, Object> queryQueueName(String queueName);
+
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
index 30d98a1..47a4082 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
@@ -92,4 +92,20 @@ public interface TenantService {
* @return true if tenant code can user, otherwise return false
*/
Result verifyTenantCode(String tenantCode);
+
+ /**
+ * check if provide tenant code object exists
+ *
+ * @param tenantCode tenant code
+ * @return true if tenant code exists, false if not
+ */
+ boolean checkTenantExists(String tenantCode);
+
+ /**
+ * query tenant by tenant code
+ *
+ * @param tenantCode tenant code
+ * @return tenant list
+ */
+ Map<String, Object> queryByTenantCode(String tenantCode);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7d49cfa..9fc5aab 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -139,6 +139,21 @@ public class ProjectServiceImpl extends BaseServiceImpl
implements ProjectServic
return result;
}
+ @Override
+ public Map<String, Object> queryByName(User loginUser, String projectName)
{
+ Map<String, Object> result = new HashMap<>();
+ Project project = projectMapper.queryByName(projectName);
+ boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project,
result);
+ if (!hasProjectAndPerm) {
+ return result;
+ }
+ if (project != null) {
+ result.put(Constants.DATA_LIST, project);
+ putMsg(result, Status.SUCCESS);
+ }
+ return result;
+ }
+
/**
* check project and authorization
*
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index edd0e1d..e298833 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -263,6 +263,32 @@ public class QueueServiceImpl extends BaseServiceImpl
implements QueueService {
}
/**
+ * query queue by queueName
+ *
+ * @param queueName queue name
+ * @return queue object for provide queue name
+ */
+ @Override
+ public Map<String, Object> queryQueueName(String queueName) {
+ Map<String, Object> result = new HashMap<>();
+
+ if (StringUtils.isEmpty(queueName)) {
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR,
Constants.QUEUE_NAME);
+ return result;
+ }
+
+ if (!checkQueueNameExist(queueName)) {
+ putMsg(result, Status.QUEUE_NOT_EXIST, queueName);
+ return result;
+ }
+
+ List<Queue> queueList = queueMapper.queryQueueName(queueName);
+ result.put(Constants.DATA_LIST, queueList);
+ putMsg(result, Status.SUCCESS);
+ return result;
+ }
+
+ /**
* check queue exist
* if exists return true,not exists return false
* check queue exist
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index fc01bd7..58ab865 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -320,8 +320,25 @@ public class TenantServiceImpl extends BaseServiceImpl
implements TenantService
* @param tenantCode tenant code
* @return ture if the tenant code exists, otherwise return false
*/
- private boolean checkTenantExists(String tenantCode) {
+ public boolean checkTenantExists(String tenantCode) {
Boolean existTenant = tenantMapper.existTenant(tenantCode);
return existTenant == Boolean.TRUE;
}
+
+ /**
+ * query tenant by tenant code
+ *
+ * @param tenantCode tenant code
+ * @return tenant detail information
+ */
+ @Override
+ public Map<String, Object> queryByTenantCode(String tenantCode) {
+ Map<String, Object> result = new HashMap<>();
+ Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
+ if (tenant != null) {
+ result.put(Constants.DATA_LIST, tenant);
+ putMsg(result, Status.SUCCESS);
+ }
+ return result;
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index e662347..fc9e36f 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -415,6 +415,7 @@ public final class Constants {
public static final String NULL = "NULL";
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
+ public static final String THREAD_NAME_GATEWAY_SERVER = "Gateway-Server";
/**
* command parameter keys
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
index 027bfd2..e486070 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java
@@ -53,4 +53,11 @@ public interface QueueMapper extends BaseMapper<Queue> {
* @return true if exist else return null
*/
Boolean existQueue(@Param("queue") String queue, @Param("queueName")
String queueName);
+
+ /**
+ * query queue by queue name
+ * @param queueName queueName
+ * @return queue list
+ */
+ List<Queue> queryQueueName(@Param("queueName") String queueName);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
index 0d75c6e..cf381e4 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml
@@ -54,4 +54,13 @@
and queue_name =#{queueName}
</if>
</select>
+ <select id="queryQueueName"
resultType="org.apache.dolphinscheduler.dao.entity.Queue">
+ select
+ <include refid="baseSql"/>
+ from t_ds_queue
+ where 1 = 1
+ <if test="queueName != null and queueName != ''">
+ and queue_name =#{queueName}
+ </if>
+ </select>
</mapper>
diff --git a/dolphinscheduler-dist/release-docs/LICENSE
b/dolphinscheduler-dist/release-docs/LICENSE
index 63ab3f6..937216f 100644
--- a/dolphinscheduler-dist/release-docs/LICENSE
+++ b/dolphinscheduler-dist/release-docs/LICENSE
@@ -436,7 +436,7 @@ The text of each license is also included at
licenses/LICENSE-[project].txt.
threetenbp 1.3.6:
https://mvnrepository.com/artifact/org.threeten/threetenbp/1.3.6, BSD 3-clause
xmlenc 0.52: https://mvnrepository.com/artifact/xmlenc/xmlenc/0.52, BSD
hamcrest-core 1.3:
https://mvnrepository.com/artifact/org.hamcrest/hamcrest-core/1.3, BSD 2-Clause
-
+ py4j 0.10.9: https://mvnrepository.com/artifact/net.sf.py4j/py4j/0.10.9,
BSD 2-clause
========================================================================
CDDL licenses
diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt
b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt
new file mode 100644
index 0000000..2c7adb6
--- /dev/null
+++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt
@@ -0,0 +1,26 @@
+Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All
+rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+- The name of the author may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dolphinscheduler-python/pom.xml b/dolphinscheduler-python/pom.xml
new file mode 100644
index 0000000..ce7968c
--- /dev/null
+++ b/dolphinscheduler-python/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<modelVersion>4.0.0</modelVersion>
+<parent>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+</parent>
+<artifactId>dolphinscheduler-python</artifactId>
+<name>${project.artifactId}</name>
+<packaging>jar</packaging>
+
+<dependencies>
+ <!-- dolphinscheduler -->
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-api</artifactId>
+ </dependency>
+
+ <!--springboot-->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-tomcat</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ </dependency>
+
+</dependencies>
+</project>
diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md
b/dolphinscheduler-python/pydolphinscheduler/README.md
new file mode 100644
index 0000000..a987483
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/README.md
@@ -0,0 +1,103 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# pydolphinscheduler
+
+pydolphinscheduler is python API for Apache DolphinScheduler, which allow you
definition
+your workflow by python code, aka workflow-as-codes.
+
+## Quick Start
+
+> **_Notice:_** For now, due to pydolphinscheduler without release to any
binary tarball or [PyPI][pypi], you
+> have to clone Apache DolphinScheduler code from GitHub to ensure quick start
setup
+
+Here we show you how to install and run a simple example of pydolphinscheduler
+
+### Prepare
+
+```shell
+# Clone code from github
+git clone [email protected]:apache/dolphinscheduler.git
+
+# Install pydolphinscheduler from source
+cd dolphinscheduler-python/pydolphinscheduler
+pip setup.py install
+```
+
+### Start Server And Run Example
+
+Before you run an example, you have to start backend server. You could follow
[development setup][dev-setup]
+section "DolphinScheduler Standalone Quick Start" to set up developer
environment. You have to start backend
+and frontend server in this step, which mean that you could view
DolphinScheduler UI in your browser with URL
+http://localhost:12345/dolphinscheduler
+
+After backend server is being start, all requests from `pydolphinscheduler`
would be sends to backend server.
+And for now we could run a simple example by:
+
+```shell
+cd dolphinscheduler-python/pydolphinscheduler
+python example/tutorial.py
+```
+
+> **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while
running command, you might need to change
+> tenant value in `example/tutorial.py`. For now the value is `tenant_exists`,
please change it to username exists
+> in you environment.
+
+After command execute, you could see a new project with single process
definition named *tutorial* in the [UI][ui-project].
+
+Until now, we finish quick start by an example of pydolphinscheduler and run
it. If you want to inspect or join
+pydolphinscheduler develop, you could take a look at [develop](#develop)
+
+## Develop
+
+pydolphinscheduler is python API for Apache DolphinScheduler, it just defines
what workflow look like instead of
+store or execute it. We here use [py4j][py4j] to dynamically access Java
Virtual Machine.
+
+### Setup Develop Environment
+
+We already clone the code in [quick start](#quick-start), so next step we have
to open pydolphinscheduler project
+in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ
IDEA][idea] to open it. And you could
+just open directory `dolphinscheduler-python/pydolphinscheduler` instead of
`dolphinscheduler-python`.
+
+### Brief Concept
+
+Apache DolphinScheduler is design to define workflow by UI, and
pydolphinscheduler try to define it by code. When
+define by code, user usually do not care user, tanant, or queue exists or not.
All user care about is create
+a new workflow by the code his/her definition. So we have some **side object**
in `pydolphinscheduler/side`
+directory, their only check object exists or not, and create them if not
exists.
+
+#### Process Definition
+
+pydolphinscheduler workflow object name, process definition is also same name
as Java object(maybe would be change to
+other word for more simple).
+
+#### Tasks
+
+pydolphinscheduler tasks object, we use tasks to define exact job we want
DolphinScheduler do for us. For now,
+we only support `shell` task to execute shell task. [This link][all-task] list
all tasks support in DolphinScheduler
+and would be implement in the further.
+
+
+[pypi]: https://pypi.org/
+[dev-setup]:
https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html
+[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list
+[py4j]: https://www.py4j.org/index.html
+[pycharm]: https://www.jetbrains.com/pycharm
+[idea]: https://www.jetbrains.com/idea/
+[all-task]:
https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html
diff --git a/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
new file mode 100644
index 0000000..32ad5e2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Roadmap
+
+### v0.0.3
+
+Add other features, tasks, parameters in DS, keep code coverage up to 90%
+
+### v0.0.2
+
+Add docs about how to use and develop package, code coverage up to 90%, add
CI/CD
+for package
+
+### v0.0.1(current)
+
+Setup up POC, for defining DAG with python code, running DAG manually,
+releasing to pypi
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
new file mode 100644
index 0000000..7756534
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be
create,
+and workflow DAG graph as below:
+
+ --> task_child_one
+ / \
+task_parent --> --> task_union
+ \ /
+ --> task_child_two
+"""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
+ task_parent = Shell(name="task_parent", command="echo hello
pydolphinscheduler")
+ task_child_one = Shell(name="task_child_one", command="echo 'child one'")
+ task_child_two = Shell(name="task_child_two", command="echo 'child two'")
+ task_union = Shell(name="task_union", command="echo union")
+
+ task_group = [task_child_one, task_child_two]
+ task_parent.set_downstream(task_group)
+
+ task_union << task_group
+
+ pd.run()
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements.txt
b/dolphinscheduler-python/pydolphinscheduler/requirements.txt
new file mode 100644
index 0000000..cdec3ca
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/requirements.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+py4j~=0.10.9.2
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
new file mode 100644
index 0000000..be98ce9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# testting
+pytest~=6.2.5
+# code linting and formatting
+flake8-black~=0.2.3
+# flake8
+# flake8-docstrings
+# flake8-black
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.cfg
b/dolphinscheduler-python/pydolphinscheduler/setup.cfg
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.cfg
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py
b/dolphinscheduler-python/pydolphinscheduler/setup.py
new file mode 100644
index 0000000..8e9cea4
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import sys
+from os.path import dirname, join
+
+from setuptools import find_packages, setup
+
+version = '0.0.1.dev0'
+
+if sys.version_info[0] < 3:
+ raise Exception("pydolphinscheduler does not support Python 2. Please
upgrade to Python 3.")
+
+
+def read(*names, **kwargs):
+ return open(
+ join(dirname(__file__), *names), encoding=kwargs.get("encoding",
"utf8")
+ ).read()
+
+
+setup(
+ name="pydolphinscheduler",
+ version=version,
+ license="Apache License 2.0",
+ description="Apache DolphinScheduler python SDK",
+ long_description=read("README.md"),
+ # Make sure pypi is expecting markdown
+ long_description_content_type="text/markdown",
+ author="Apache Software Foundation",
+ author_email="[email protected]",
+ url="https://dolphinscheduler.apache.org/",
+ python_requires=">=3.6",
+ keywords=[
+ "dolphinscheduler",
+ "workflow",
+ "scheduler",
+ "taskflow",
+ ],
+ project_urls={
+ "Homepage": "https://dolphinscheduler.apache.org",
+ "Documentation":
"https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/quick-start.html",
+ "Source": "https://github.com/apache/dolphinscheduler",
+ "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues",
+ "Discussion": "https://github.com/apache/dolphinscheduler/discussions",
+ "Twitter": "https://twitter.com/dolphinschedule",
+ },
+ packages=find_packages(where="src"),
+ package_dir={"": "src"},
+ include_package_data=True,
+ classifiers=[
+ # complete classifier list:
http://pypi.python.org/pypi?%3Aaction=list_classifiers
+ "Development Status :: 1 - Planning",
+ "Environment :: Console",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: Unix",
+ "Operating System :: POSIX",
+ "Operating System :: Microsoft :: Windows",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: Implementation :: CPython",
+ "Programming Language :: Python :: Implementation :: PyPy",
+ "Topic :: Software Development :: User Interfaces",
+ ],
+ install_requires=[
+ # Core
+ "py4j~=0.10",
+ # Dev
+ "pytest~=6.2",
+ ]
+)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
new file mode 100644
index 0000000..eda07aa
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class ProcessDefinitionReleaseState:
+ """
+ ProcessDefinition release state
+ """
+ ONLINE: str = "ONLINE"
+ OFFLINE: str = "OFFLINE"
+
+
+class ProcessDefinitionDefault:
+ """
+ ProcessDefinition default values
+ """
+ PROJECT: str = "project-pydolphin"
+ TENANT: str = "tenant_pydolphin"
+ USER: str = "userPythonGateway"
+ # TODO simple set password same as username
+ USER_PWD: str = "userPythonGateway"
+ USER_EMAIL: str = "[email protected]"
+ USER_PHONE: str = "11111111111"
+ USER_STATE: int = 1
+ QUEUE: str = "queuePythonGateway"
+ WORKER_GROUP: str = "default"
+
+
+class TaskPriority(str):
+ HIGHEST = "HIGHEST"
+ HIGH = "HIGH"
+ MEDIUM = "MEDIUM"
+ LOW = "LOW"
+ LOWEST = "LOWEST"
+
+
+class TaskFlag(str):
+ YES = "YES"
+ NO = "NO"
+
+
+class TaskTimeoutFlag(str):
+ CLOSE = "CLOSE"
+
+
+class TaskType(str):
+ SHELL = "SHELL"
+
+
+class DefaultTaskCodeNum(str):
+ DEFAULT = 1
+
+
+class JavaGatewayDefault(str):
+ RESULT_MESSAGE_KEYWORD = "msg"
+ RESULT_MESSAGE_SUCCESS = "success"
+
+ RESULT_STATUS_KEYWORD = "status"
+ RESULT_STATUS_SUCCESS = "SUCCESS"
+
+ RESULT_DATA = "data"
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
new file mode 100644
index 0000000..175754f
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, Dict
+
+# from pydolphinscheduler.side.user import User
+from pydolphinscheduler.utils.string import attr2camel
+
+
+class Base:
+ """
+ Base
+ """
+
+ _KEY_ATTR: set = {
+ "name",
+ "description"
+ }
+
+ _TO_DICT_ATTR: set = set()
+
+ DEFAULT_ATTR: Dict = {}
+
+ def __init__(
+ self,
+ name: str,
+ description: Optional[str] = None
+ ):
+ self.name = name
+ self.description = description
+
+ def __repr__(self) -> str:
+ return f'<{type(self).__name__}: name="{self.name}">'
+
+ def __eq__(self, other):
+ return type(self) == type(other) and \
+ all(getattr(self, a, None) == getattr(other, a, None) for a in
self._KEY_ATTR)
+
+ # TODO check how Redash do
+ # TODO DRY
+ def to_dict(self, camel_attr=True) -> Dict:
+ # content = {}
+ # for attr, value in self.__dict__.items():
+ # # Don't publish private variables
+ # if attr.startswith("_"):
+ # continue
+ # else:
+ # content[snake2camel(attr)] = value
+ # content.update(self.DEFAULT_ATTR)
+ # return content
+ content = {}
+ for attr in self._TO_DICT_ATTR:
+ val = getattr(self, attr, None)
+ if camel_attr:
+ content[attr2camel(attr)] = val
+ else:
+ content[attr] = val
+ return content
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
new file mode 100644
index 0000000..cf0f14e
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base import Base
+
+
+class BaseSide(Base):
+ def __init__(
+ self,
+ name: str,
+ description: Optional[str] = None
+ ):
+ super().__init__(name, description)
+
+ @classmethod
+ def create_if_not_exists(
+ cls,
+ # TODO comment for avoiding cycle import
+ # user: Optional[User] = ProcessDefinitionDefault.USER
+ user=ProcessDefinitionDefault.USER
+ ):
+ """
+ Create Base if not exists
+ """
+
+ raise NotImplementedError
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
new file mode 100644
index 0000000..8d295d0
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -0,0 +1,249 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from typing import Optional, List, Dict, Set
+
+from pydolphinscheduler.constants import ProcessDefinitionReleaseState,
ProcessDefinitionDefault
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side import Tenant, Project, User
+
+
+class ProcessDefinitionContext:
+ _context_managed_process_definition: Optional["ProcessDefinition"] = None
+
+ @classmethod
+ def set(cls, pd: "ProcessDefinition") -> None:
+ cls._context_managed_process_definition = pd
+
+ @classmethod
+ def get(cls) -> Optional["ProcessDefinition"]:
+ return cls._context_managed_process_definition
+
+ @classmethod
+ def delete(cls) -> None:
+ cls._context_managed_process_definition = None
+
+
+class ProcessDefinition(Base):
+ """
+ ProcessDefinition
+ TODO :ref: comment may not correct ref
+ TODO: maybe we should rename this class, currently use DS object name
+ """
+
+ # key attribute for identify ProcessDefinition object
+ _KEY_ATTR = {
+ "name",
+ "project",
+ "tenant",
+ "release_state",
+ "param",
+ }
+
+ _TO_DICT_ATTR = {
+ "name",
+ "description",
+ "_project",
+ "_tenant",
+ "timeout",
+ "release_state",
+ "param",
+ "tasks",
+ "task_definition_json",
+ "task_relation_json",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ description: Optional[str] = None,
+ user: Optional[str] = ProcessDefinitionDefault.USER,
+ project: Optional[str] = ProcessDefinitionDefault.PROJECT,
+ tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
+ queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
+ timeout: Optional[int] = 0,
+ release_state: Optional[str] =
ProcessDefinitionReleaseState.ONLINE,
+ param: Optional[List] = None
+ ):
+ super().__init__(name, description)
+ self._user = user
+ self._project = project
+ self._tenant = tenant
+ self._queue = queue
+ self.timeout = timeout
+ self.release_state = release_state
+ self.param = param
+ self.tasks: dict = {}
+ # TODO how to fix circle import
+ self._task_relations: set["TaskRelation"] = set()
+ self._process_definition_code = None
+
+ def __enter__(self) -> "ProcessDefinition":
+ ProcessDefinitionContext.set(self)
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+ ProcessDefinitionContext.delete()
+
+ @property
+ def tenant(self) -> Tenant:
+ return Tenant(self._tenant)
+
+ @tenant.setter
+ def tenant(self, tenant: Tenant) -> None:
+ self._tenant = tenant.name
+
+ @property
+ def project(self) -> Project:
+ return Project(self._project)
+
+ @project.setter
+ def project(self, project: Project) -> None:
+ self._project = project.name
+
+ @property
+ def user(self) -> User:
+ return User(self._user,
+ ProcessDefinitionDefault.USER_PWD,
+ ProcessDefinitionDefault.USER_EMAIL,
+ ProcessDefinitionDefault.USER_PHONE,
+ ProcessDefinitionDefault.TENANT,
+ ProcessDefinitionDefault.QUEUE,
+ ProcessDefinitionDefault.USER_STATE)
+
+ @property
+ def task_definition_json(self) -> List[Dict]:
+ if not self.tasks:
+ return [self.tasks]
+ else:
+ return [task.to_dict() for task in self.tasks.values()]
+
+ @property
+ def task_relation_json(self) -> List[Dict]:
+ if not self.tasks:
+ return [self.tasks]
+ else:
+ self._handle_root_relation()
+ return [tr.to_dict() for tr in self._task_relations]
+
+ # TODO inti DAG's tasks are in the same place
+ @property
+ def task_location(self) -> List[Dict]:
+ if not self.tasks:
+ return [self.tasks]
+ else:
+ return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in
self.tasks]
+
+ @property
+ def task_list(self) -> List["Task"]:
+ return list(self.tasks.values())
+
+ def _handle_root_relation(self):
+ from pydolphinscheduler.core.task import TaskRelation
+ post_relation_code = set()
+ for relation in self._task_relations:
+ post_relation_code.add(relation.post_task_code)
+ for task in self.task_list:
+ if task.code not in post_relation_code:
+ root_relation = TaskRelation(pre_task_code=0,
post_task_code=task.code)
+ self._task_relations.add(root_relation)
+
+ def add_task(self, task: "Task") -> None:
+ self.tasks[task.code] = task
+ task._process_definition = self
+
+ def add_tasks(self, tasks: List["Task"]) -> None:
+ for task in tasks:
+ self.add_task(task)
+
+ def get_task(self, code: str) -> "Task":
+ if code not in self.tasks:
+ raise ValueError("Task with code %s can not found in process
definition %", (code, self.name))
+ return self.tasks[code]
+
+ # TODO which tying should return in this case
+ def get_tasks_by_name(self, name: str) -> Set["Task"]:
+ find = set()
+ for task in self.tasks.values():
+ if task.name == name:
+ find.add(task)
+ return find
+
+ def get_one_task_by_name(self, name: str) -> "Task":
+ tasks = self.get_tasks_by_name(name)
+ if not tasks:
+ raise ValueError(f"Can not find task with name {name}.")
+ return tasks.pop()
+
+ def run(self):
+ """
+ Run ProcessDefinition instance, a shortcut for :ref: submit and :ref:
start
+ Only support manual for now, schedule run will coming soon
+ :return:
+ """
+ self.submit()
+ self.start()
+
+ def _ensure_side_model_exists(self):
+ """
+ Ensure side model exists which including :ref: Project, Tenant, User.
+ If those model not exists, would create default value in :ref:
ProcessDefinitionDefault
+ """
+ # TODO used metaclass for more pythonic
+ self.tenant.create_if_not_exists(self._queue)
+ # model User have to create after Tenant created
+ self.user.create_if_not_exists()
+ # Project model need User object exists
+ self.project.create_if_not_exists(self._user)
+
+ def submit(self) -> int:
+ """
+ Submit ProcessDefinition instance to java gateway
+ :return:
+ """
+ self._ensure_side_model_exists()
+ gateway = launch_gateway()
+ self._process_definition_code =
gateway.entry_point.createOrUpdateProcessDefinition(
+ self._user,
+ self._project,
+ self.name,
+ str(self.description) if self.description else "",
+ str(self.param) if self.param else None,
+ json.dumps(self.task_location),
+ self.timeout,
+ self._tenant,
+ # TODO add serialization function
+ json.dumps(self.task_relation_json),
+ json.dumps(self.task_definition_json),
+ )
+ return self._process_definition_code
+
+ def start(self) -> None:
+ """
+ Start ProcessDefinition instance which post to
`start-process-instance` to java gateway
+ :return:
+ """
+ gateway = launch_gateway()
+ gateway.entry_point.execProcessInstance(
+ self._user,
+ self._project,
+ self.name,
+ "",
+ "default",
+ 24 * 3600,
+ )
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
new file mode 100644
index 0000000..fa97c76
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -0,0 +1,237 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional, List, Dict, Set, Union, Sequence, Tuple
+
+from pydolphinscheduler.constants import TaskPriority,
ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \
+ DefaultTaskCodeNum, JavaGatewayDefault
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
+from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+from pydolphinscheduler.utils.string import snake2camel, class_name2camel
+
+
+class ObjectJsonBase:
+ DEFAULT_ATTR = {}
+
+ def __int__(self, *args, **kwargs):
+ pass
+
+ def __str__(self) -> str:
+ content = []
+ for attribute, value in self.__dict__.items():
+ content.append(f"\"{snake2camel(attribute)}\": {value}")
+ content = ",".join(content)
+ return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}"
+
+ # TODO check how Redash do
+ # TODO DRY
+ def to_dict(self) -> Dict:
+ content = {snake2camel(attr): value for attr, value in
self.__dict__.items()}
+ content.update(self.DEFAULT_ATTR)
+ return content
+
+
+class TaskParams(ObjectJsonBase):
+ DEFAULT_CONDITION_RESULT = {
+ "successNode": [
+ ""
+ ],
+ "failedNode": [
+ ""
+ ]
+ }
+
+ def __init__(
+ self,
+ raw_script: str,
+ local_params: Optional[List] = None,
+ resource_list: Optional[List] = None,
+ dependence: Optional[Dict] = None,
+ wait_start_timeout: Optional[Dict] = None,
+ condition_result: Optional[Dict] = None,
+ ):
+ super().__init__()
+ self.raw_script = raw_script
+ self.local_params = local_params or []
+ self.resource_list = resource_list or []
+ self.dependence = dependence or {}
+ self.wait_start_timeout = wait_start_timeout or {}
+ # TODO need better way to handle it, this code just for POC
+ self.condition_result = condition_result or
self.DEFAULT_CONDITION_RESULT
+
+
+class TaskRelation(ObjectJsonBase):
+ DEFAULT_ATTR = {
+ "name": "",
+ "preTaskVersion": 1,
+ "postTaskVersion": 1,
+ "conditionType": 0,
+ "conditionParams": {}
+ }
+
+ def __init__(
+ self,
+ pre_task_code: int,
+ post_task_code: int,
+ ):
+ super().__init__()
+ self.pre_task_code = pre_task_code
+ self.post_task_code = post_task_code
+
+ def __hash__(self):
+ return hash(f"{self.post_task_code}, {self.post_task_code}")
+
+
+class Task(Base):
+
+ DEFAULT_DEPS_ATTR = {
+ "name": "",
+ "preTaskVersion": 1,
+ "postTaskVersion": 1,
+ "conditionType": 0,
+ "conditionParams": {}
+ }
+
+ def __init__(
+ self,
+ name: str,
+ task_type: str,
+ task_params: TaskParams,
+ description: Optional[str] = None,
+ flag: Optional[str] = TaskFlag.YES,
+ task_priority: Optional[str] = TaskPriority.MEDIUM,
+ worker_group: Optional[str] =
ProcessDefinitionDefault.WORKER_GROUP,
+ delay_time: Optional[int] = 0,
+ fail_retry_times: Optional[int] = 0,
+ fail_retry_interval: Optional[int] = 1,
+ timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
+ timeout_notify_strategy: Optional = None,
+ timeout: Optional[int] = 0,
+ process_definition: Optional[ProcessDefinition] = None,
+ ):
+
+ super().__init__(name, description)
+ self.task_type = task_type
+ self.task_params = task_params
+ self.flag = flag
+ self.task_priority = task_priority
+ self.worker_group = worker_group
+ self.fail_retry_times = fail_retry_times
+ self.fail_retry_interval = fail_retry_interval
+ self.delay_time = delay_time
+ self.timeout_flag = timeout_flag
+ self.timeout_notify_strategy = timeout_notify_strategy
+ self.timeout = timeout
+ self._process_definition = None
+ self.process_definition: ProcessDefinition = process_definition or
ProcessDefinitionContext.get()
+ self._upstream_task_codes: Set[int] = set()
+ self._downstream_task_codes: Set[int] = set()
+ self._task_relation: Set[TaskRelation] = set()
+ # move attribute code and version after _process_definition and
process_definition declare
+ self.code, self.version = self.gen_code_and_version()
+ # Add task to process definition, maybe we could put into property
process_definition latter
+ if self.process_definition is not None and self.code not in
self.process_definition.tasks:
+ self.process_definition.add_task(self)
+
+ @property
+ def process_definition(self) -> Optional[ProcessDefinition]:
+ if self._process_definition:
+ return self._process_definition
+ else:
+ raise ValueError(f'Task {self} has not been assigned to a
ProcessDefinition yet')
+
+ @process_definition.setter
+ def process_definition(self, process_definition:
Optional[ProcessDefinition]):
+ self._process_definition = process_definition
+
+ def __hash__(self):
+ return hash(self.code)
+
+ def __lshift__(self, other: Union["Task", Sequence["Task"]]):
+ """Implements Task << Task"""
+ self.set_upstream(other)
+ return other
+
+ def __rshift__(self, other: Union["Task", Sequence["Task"]]):
+ """Implements Task >> Task"""
+ self.set_downstream(other)
+ return other
+
+ def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
+ """Called for Task >> [Task] because list don't have __rshift__
operators."""
+ self.__lshift__(other)
+ return self
+
+ def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
+ """Called for Task << [Task] because list don't have __lshift__
operators."""
+ self.__rshift__(other)
+ return self
+
+ def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool
= True) -> None:
+ if not isinstance(tasks, Sequence):
+ tasks = [tasks]
+
+ for task in tasks:
+ if upstream:
+ self._upstream_task_codes.add(task.code)
+ task._downstream_task_codes.add(self.code)
+
+ if self._process_definition:
+ task_relation = TaskRelation(
+ pre_task_code=task.code,
+ post_task_code=self.code,
+ )
+ self.process_definition._task_relations.add(task_relation)
+ else:
+ self._downstream_task_codes.add(task.code)
+ task._upstream_task_codes.add(self.code)
+
+ if self._process_definition:
+ task_relation = TaskRelation(
+ pre_task_code=self.code,
+ post_task_code=task.code,
+ )
+ self.process_definition._task_relations.add(task_relation)
+
+ def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ self._set_deps(tasks, upstream=True)
+
+ def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ self._set_deps(tasks, upstream=False)
+
+ # TODO code should better generate in bulk mode when :ref:
processDefinition run submit or start
+ def gen_code_and_version(self) -> Tuple:
+ # TODO get code from specific project process definition and task name
+ gateway = launch_gateway()
+ result =
gateway.entry_point.getCodeAndVersion(self.process_definition._project,
self.name)
+ # result =
gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
+ # gateway_result_checker(result)
+ return result.get("code"), result.get("version")
+
+ def to_dict(self, camel_attr=True) -> Dict:
+ content = {}
+ for attr, value in self.__dict__.items():
+ # Don't publish private variables
+ if attr.startswith("_"):
+ continue
+ elif isinstance(value, TaskParams):
+ content[snake2camel(attr)] = value.to_dict()
+ else:
+ content[snake2camel(attr)] = value
+ return content
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
new file mode 100644
index 0000000..e93e8f1
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from py4j.java_collections import JavaMap
+from py4j.java_gateway import JavaGateway, GatewayParameters
+
+from pydolphinscheduler.constants import JavaGatewayDefault
+
+
+def launch_gateway() -> JavaGateway:
+ # TODO Note that automatic conversion makes calling Java methods slightly
less efficient because
+ # in the worst case, Py4J needs to go through all registered converters
for all parameters.
+ # This is why automatic conversion is disabled by default.
+ gateway =
JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True))
+ return gateway
+
+
+def gateway_result_checker(
+ result: JavaMap,
+ msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS
+) -> Any:
+ if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \
+ JavaGatewayDefault.RESULT_STATUS_SUCCESS:
+ raise RuntimeError(f"Failed when try to got result for java gateway")
+ if msg_check is not None and
result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check:
+ raise ValueError(f"Get result state not success.")
+ return result
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
new file mode 100644
index 0000000..2f376a5
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pydolphinscheduler.side.project import Project
+from pydolphinscheduler.side.tenant import Tenant
+from pydolphinscheduler.side.user import User
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
new file mode 100644
index 0000000..b118be9
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+
+
+class Project(BaseSide):
+ """
+ Project
+ """
+
+ def __init__(
+ self,
+ name: str = ProcessDefinitionDefault.PROJECT,
+ description: Optional[str] = None
+ ):
+ super().__init__(name, description)
+
+ def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+ """
+ Create Project if not exists
+ """
+ gateway = launch_gateway()
+ result = gateway.entry_point.createProject(user, self.name,
self.description)
+ # TODO recover result checker
+ # gateway_result_checker(result, None)
+
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
new file mode 100644
index 0000000..4c0d1f6
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+
+
+class Queue(BaseSide):
+ """
+ Queue
+ """
+
+ def __init__(
+ self,
+ name: str = ProcessDefinitionDefault.QUEUE,
+ description: Optional[str] = ""
+ ):
+ super().__init__(name, description)
+
+ def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+ """
+ Create Queue if not exists
+ """
+ gateway = launch_gateway()
+ # Here we set Queue.name and Queue.queueName same as self.name
+ result = gateway.entry_point.createProject(user, self.name, self.name)
+ gateway_result_checker(result, None)
\ No newline at end of file
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
new file mode 100644
index 0000000..9cba533
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+
+
+class Tenant(BaseSide):
+ """
+ Tenant
+ """
+
+ def __init__(
+ self,
+ name: str = ProcessDefinitionDefault.TENANT,
+ queue: str = ProcessDefinitionDefault.QUEUE,
+ description: Optional[str] = None
+ ):
+ super().__init__(name, description)
+ self.queue = queue
+
+ def create_if_not_exists(self, queue_name: str,
user=ProcessDefinitionDefault.USER) -> None:
+ """
+ Create Tenant if not exists
+ """
+ gateway = launch_gateway()
+ result = gateway.entry_point.createTenant(self.name, self.description,
queue_name)
+ # gateway_result_checker(result, None)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
new file mode 100644
index 0000000..fc7c339
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+
+
+class User(BaseSide):
+ _KEY_ATTR = {
+ "name",
+ "password",
+ "email",
+ "phone",
+ "tenant",
+ "queue",
+ "status",
+ }
+
+ def __init__(
+ self,
+ name: str,
+ password: str,
+ email: str,
+ phone: str,
+ tenant: str,
+ queue: Optional[str] = None,
+ status: Optional[int] = 1,
+ ):
+ super().__init__(name)
+ self.password = password
+ self.email = email
+ self.phone = phone
+ self.tenant = tenant
+ self.queue = queue
+ self.status = status
+
+ def create_if_not_exists(self, **kwargs):
+ """
+ Create User if not exists
+ """
+ gateway = launch_gateway()
+ result = gateway.entry_point.createUser(
+ self.name,
+ self.password,
+ self.email,
+ self.phone,
+ self.tenant,
+ self.queue,
+ self.status
+ )
+ # TODO recover result checker
+ # gateway_result_checker(result, None)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
new file mode 100644
index 0000000..d4b1bb4
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+from pydolphinscheduler.core.base_side import BaseSide
+
+
+class WorkerGroup(BaseSide):
+ """
+ Worker Group
+ """
+
+ def __init__(
+ self,
+ name: str,
+ address: str,
+ description: Optional[str] = None
+ ):
+ super().__init__(name, description)
+ self.address = address
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
new file mode 100644
index 0000000..e60c78b
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task, TaskParams
+
+
+class Shell(Task):
+ # TODO maybe we could use instance name to replace attribute `name`
+ # which is simplify as `task_shell = Shell(command = "echo 1")` and
+ # task.name assign to `task_shell`
+ def __init__(
+ self,
+ name: str,
+ command: str,
+ task_type: str = TaskType.SHELL,
+ *args, **kwargs
+ ):
+ task_params = TaskParams(raw_script=command)
+ super().__init__(name, task_type, task_params, *args, **kwargs)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
new file mode 100644
index 0000000..c3bab71
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+def attr2camel(attr: str, include_private=True):
+ if include_private:
+ attr = attr.lstrip("_")
+ return snake2camel(attr)
+
+
+def snake2camel(snake: str):
+ components = snake.split("_")
+ return components[0] + "".join(x.title() for x in components[1:])
+
+
+def class_name2camel(class_name: str):
+ return class_name[0].lower() + class_name[1:]
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
new file mode 100644
index 0000000..7155447
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from pydolphinscheduler.constants import ProcessDefinitionDefault,
ProcessDefinitionReleaseState
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.side import Tenant, Project, User
+
+TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+
+
[email protected](
+ "func",
+ [
+ "run", "submit", "start"
+ ]
+)
+def test_process_definition_key_attr(func):
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ assert hasattr(pd, func), f"ProcessDefinition instance don't have
attribute `{func}`"
+
+
[email protected](
+ "name,value",
+ [
+ ("project", Project(ProcessDefinitionDefault.PROJECT)),
+ ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
+ ("user", User(ProcessDefinitionDefault.USER,
+ ProcessDefinitionDefault.USER_PWD,
+ ProcessDefinitionDefault.USER_EMAIL,
+ ProcessDefinitionDefault.USER_PHONE,
+ ProcessDefinitionDefault.TENANT,
+ ProcessDefinitionDefault.QUEUE,
+ ProcessDefinitionDefault.USER_STATE)),
+ ("release_state", ProcessDefinitionReleaseState.ONLINE),
+ ],
+)
+def test_process_definition_default_value(name, value):
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ assert getattr(pd, name) == value, \
+ f"ProcessDefinition instance attribute `{name}` have not except
default value `{getattr(pd, name)}`"
+
+
[email protected](
+ "name,cls,expect",
+ [
+ ("project", Project, "project"),
+ ("tenant", Tenant, "tenant"),
+ ],
+)
+def test_process_definition_set_attr(name, cls, expect):
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ setattr(pd, name, cls(expect))
+ assert getattr(pd, name) == cls(
+ expect), f"ProcessDefinition set attribute `{name}` do not work
expect"
+
+
+def test_process_definition_to_dict_without_task():
+ expect = {
+ "name": TEST_PROCESS_DEFINITION_NAME,
+ "description": None,
+ "project": ProcessDefinitionDefault.PROJECT,
+ "tenant": ProcessDefinitionDefault.TENANT,
+ "timeout": 0,
+ "releaseState": ProcessDefinitionReleaseState.ONLINE,
+ "param": None,
+ "tasks": {},
+ "taskDefinitionJson": [{}],
+ "taskRelationJson": [{}],
+ }
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ assert pd.to_dict() == expect
+
+
+def test_process_definition_simple():
+ expect_tasks_num = 5
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ for i in range(expect_tasks_num):
+ task_params = TaskParams(raw_script=f"test-raw-script-{i}")
+ curr_task = Task(name=f"task-{i}", task_type=f"type-{i}",
task_params=task_params)
+ # Set deps task i as i-1 parent
+ if i > 0:
+ pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+ curr_task.set_upstream(pre_task)
+ assert len(pd.tasks) == expect_tasks_num
+
+ # Test if task process_definition same as origin one
+ task: Task = pd.get_one_task_by_name("task-0")
+ assert pd is task.process_definition
+
+ # Test if all tasks with expect deps
+ for i in range(expect_tasks_num):
+ task: Task = pd.get_one_task_by_name(f"task-{i}")
+ if i == 0:
+ assert task._upstream_task_codes == set()
+ assert task._downstream_task_codes ==
{pd.get_one_task_by_name("task-1").code}
+ elif i == expect_tasks_num - 1:
+ assert task._upstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i - 1}").code}
+ assert task._downstream_task_codes == set()
+ else:
+ assert task._upstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i - 1}").code}
+ assert task._downstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i + 1}").code}
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py
b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py
new file mode 100644
index 0000000..1d89b07
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from unittest.mock import patch
+
+from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task
+
+
+def test_task_params_to_dict():
+ raw_script = "test_task_params_to_dict"
+ expect = {
+ "resourceList": [],
+ "localParams": [],
+ "rawScript": raw_script,
+ "dependence": {},
+ "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
+ "waitStartTimeout": {}
+ }
+ task_param = TaskParams(raw_script=raw_script)
+ assert task_param.to_dict() == expect
+
+
+def test_task_relation_to_dict():
+ pre_task_code = 123
+ post_task_code = 456
+ expect = {
+ "name": "",
+ "preTaskCode": pre_task_code,
+ "postTaskCode": post_task_code,
+ "preTaskVersion": 1,
+ "postTaskVersion": 1,
+ "conditionType": 0,
+ "conditionParams": {}
+ }
+ task_param = TaskRelation(pre_task_code=pre_task_code,
post_task_code=post_task_code)
+ assert task_param.to_dict() == expect
+
+
+def test_task_to_dict():
+ code = "123"
+ name = "test_task_to_dict"
+ task_type = "test_task_to_dict_type"
+ raw_script = "test_task_params_to_dict"
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": task_type,
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "rawScript": raw_script,
+ "dependence": {},
+ "conditionResult": {
+ "successNode": [
+ ""
+ ],
+ "failedNode": [
+ ""
+ ]
+ },
+ "waitStartTimeout": {}
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "worker-group-pydolphin",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0
+ }
+ with patch('pydolphinscheduler.core.task.Task.gen_code',
return_value=code):
+ task = Task(
+ name=name,
+ task_type=task_type,
+ task_params=TaskParams(raw_script)
+ )
+ assert task.to_dict() == expect
diff --git
a/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py
b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py
new file mode 100644
index 0000000..ff1b7b5
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.shell import Shell
+
+
+def test_shell_to_dict():
+ code = "123"
+ name = "test_shell_to_dict"
+ command = "echo test shell"
+ expect = {
+ "code": code,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "SHELL",
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "rawScript": command,
+ "dependence": {},
+ "conditionResult": {
+ "successNode": [
+ ""
+ ],
+ "failedNode": [
+ ""
+ ]
+ },
+ "waitStartTimeout": {}
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "worker-group-pydolphin",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0
+ }
+ with patch('pydolphinscheduler.core.task.Task.gen_code',
return_value=code):
+ shell = Shell(name, command)
+ assert shell.to_dict() == expect
diff --git
a/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py
b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py
new file mode 100644
index 0000000..200c06d
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from py4j.java_gateway import java_import, JavaGateway
+
+
+def test_gateway_connect():
+ gateway = JavaGateway()
+ app = gateway.entry_point
+ assert app.ping() == "PONG"
+
+
+def test_jvm_simple():
+ gateway = JavaGateway()
+ smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
+ bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
+ assert bigger > smaller
+
+
+def test_python_client_java_import_single():
+ gateway = JavaGateway()
+ java_import(gateway.jvm,
"org.apache.dolphinscheduler.common.utils.FileUtils")
+ assert hasattr(gateway.jvm, "FileUtils")
+
+
+def test_python_client_java_import_package():
+ gateway = JavaGateway()
+ java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
+ # test if jvm view have some common utils
+ for util in ("FileUtils", "OSUtils", "DateUtils"):
+ assert hasattr(gateway.jvm, util)
diff --git
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
new file mode 100644
index 0000000..4a16ab4
--- /dev/null
+++
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server;
+
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
+import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
+import org.apache.dolphinscheduler.api.service.TenantService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.RunMode;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import
org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+import py4j.GatewayServer;
+
+@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
+ @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
+ "org.apache.dolphinscheduler.server.master.*",
+ "org.apache.dolphinscheduler.server.worker.*",
+ "org.apache.dolphinscheduler.server.monitor.*",
+ "org.apache.dolphinscheduler.server.log.*"
+ })
+})
+public class PythonGatewayServer extends SpringBootServletInitializer {
+ @Autowired
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Autowired
+ private TenantService tenantService;
+
+ @Autowired
+ private ExecutorService executorService;
+
+ @Autowired
+ private ProcessDefinitionService processDefinitionService;
+
+ @Autowired
+ private TaskDefinitionService taskDefinitionService;
+
+ @Autowired
+ private UsersService usersService;
+
+ @Autowired
+ private QueueService queueService;
+
+ @Autowired
+ private ProjectMapper projectMapper;
+
+ @Autowired
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ // TODO replace this user to build in admin user if we make sure build in
one could not be change
+ private final User dummyAdminUser = new User() {
+ {
+ setId(Integer.MAX_VALUE);
+ setUserName("dummyUser");
+ setUserType(UserType.ADMIN_USER);
+ }
+ };
+
+ private final Queue queuePythonGateway = new Queue() {
+ {
+ setId(Integer.MAX_VALUE);
+ setQueueName("queuePythonGateway");
+ }
+ };
+
+ public String ping() {
+ return "PONG";
+ }
+
+ // TODO Should we import package in python client side? utils package can
but service can not, why
+ // Core api
+ public Map<String, Object> genTaskCodeList(Integer genNum) {
+ return taskDefinitionService.genTaskCodeList(genNum);
+ }
+
+ public Map<String, Long> getCodeAndVersion(String projectName, String
taskName) throws SnowFlakeUtils.SnowFlakeException {
+ Project project = projectMapper.queryByName(projectName);
+ Map<String, Long> result = new HashMap<>();
+ // project do not exists, mean task not exists too, so we should
directly return init value
+ if (project == null) {
+ result.put("code", SnowFlakeUtils.getInstance().nextId());
+ result.put("version", 0L);
+ return result;
+ }
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByName(project.getCode(), taskName);
+ if (taskDefinition == null) {
+ result.put("code", SnowFlakeUtils.getInstance().nextId());
+ result.put("version", 0L);
+ } else {
+ result.put("code", taskDefinition.getCode());
+ result.put("version", (long) taskDefinition.getVersion());
+ }
+ return result;
+ }
+
+ /**
+ * create or update process definition.
+ * If process definition do not exists in Project=`projectCode` would
create a new one
+ * If process definition already exists in Project=`projectCode` would
update it
+ * All requests
+ * <p>
+ *
+ * @param name process definition name
+ * @param description description
+ * @param globalParams global params
+ * @param locations locations for nodes
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
+ * @param taskDefinitionJson taskDefinitionJson
+ * @return create result code
+ */
+ public Long createOrUpdateProcessDefinition(String userName,
+ String projectName,
+ String name,
+ String description,
+ String globalParams,
+ String locations,
+ int timeout,
+ String tenantCode,
+ String taskRelationJson,
+ String taskDefinitionJson) {
+ User user = usersService.queryUser(userName);
+ Project project = (Project) projectService.queryByName(user,
projectName).get(Constants.DATA_LIST);
+ long projectCode = project.getCode();
+ Map<String, Object> verifyProcessDefinitionExists =
processDefinitionService.verifyProcessDefinitionName(user, projectCode, name);
+
+ if (verifyProcessDefinitionExists.get(Constants.STATUS) !=
Status.SUCCESS) {
+ // update process definition
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(projectCode, name);
+ long processDefinitionCode = processDefinition.getCode();
+ // make sure process definition offline which could edit
+ processDefinitionService.releaseProcessDefinition(user,
projectCode, processDefinitionCode, ReleaseState.OFFLINE);
+ Map<String, Object> result =
processDefinitionService.updateProcessDefinition(user, projectCode, name,
processDefinitionCode, description, globalParams,
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ return processDefinitionCode;
+ } else {
+ // create process definition
+ Map<String, Object> result =
processDefinitionService.createProcessDefinition(user, projectCode, name,
description, globalParams,
+ locations, timeout, tenantCode, taskRelationJson,
taskDefinitionJson);
+ ProcessDefinition processDefinition = (ProcessDefinition)
result.get(Constants.DATA_LIST);
+ return processDefinition.getCode();
+ }
+ }
+
+ public void execProcessInstance(String userName,
+ String projectName,
+ String processDefinitionName,
+ String cronTime,
+ String workerGroup,
+ Integer timeout
+ ) {
+ User user = usersService.queryUser(userName);
+ Project project = projectMapper.queryByName(projectName);
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(project.getCode(),
processDefinitionName);
+
+ // temp default value
+ FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
+ TaskDependType taskDependType = TaskDependType.TASK_POST;
+ WarningType warningType = WarningType.NONE;
+ RunMode runMode = RunMode.RUN_MODE_SERIAL;
+ Priority priority = Priority.MEDIUM;
+ int warningGroupId = 0;
+ Long environmentCode = -1L;
+ Map<String, String> startParams = null;
+ Integer expectedParallelismNumber = null;
+ String startNodeList = null;
+
+ // make sure process definition online
+ processDefinitionService.releaseProcessDefinition(user,
project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE);
+
+ executorService.execProcessInstance(user,
+ project.getCode(),
+ processDefinition.getCode(),
+ cronTime,
+ null,
+ failureStrategy,
+ startNodeList,
+ taskDependType,
+ warningType,
+ warningGroupId,
+ runMode,
+ priority,
+ workerGroup,
+ environmentCode,
+ timeout,
+ startParams,
+ expectedParallelismNumber,
+ 0
+ );
+ }
+
+ // side object
+ public Map<String, Object> createProject(String userName, String name,
String desc) {
+ User user = usersService.queryUser(userName);
+ return projectService.createProject(user, name, desc);
+ }
+
+ public Map<String, Object> createQueue(String name, String queueName) {
+ Result<Object> verifyQueueExists = queueService.verifyQueue(name,
queueName);
+ if (verifyQueueExists.getCode() == 0) {
+ return queueService.createQueue(dummyAdminUser, name, queueName);
+ } else {
+ Map<String, Object> result = new HashMap<>();
+ // TODO function putMsg do not work here
+ result.put(Constants.STATUS, Status.SUCCESS);
+ result.put(Constants.MSG, Status.SUCCESS.getMsg());
+ return result;
+ }
+ }
+
+ public Map<String, Object> createTenant(String tenantCode, String desc,
String queueName) throws Exception {
+ if (tenantService.checkTenantExists(tenantCode)) {
+ Map<String, Object> result = new HashMap<>();
+ // TODO function putMsg do not work here
+ result.put(Constants.STATUS, Status.SUCCESS);
+ result.put(Constants.MSG, Status.SUCCESS.getMsg());
+ return result;
+ } else {
+ Result<Object> verifyQueueExists =
queueService.verifyQueue(queueName, queueName);
+ if (verifyQueueExists.getCode() == 0) {
+ // TODO why create do not return id?
+ queueService.createQueue(dummyAdminUser, queueName, queueName);
+ }
+ Map<String, Object> result =
queueService.queryQueueName(queueName);
+ List<Queue> queueList = (List<Queue>)
result.get(Constants.DATA_LIST);
+ Queue queue = queueList.get(0);
+ return tenantService.createTenant(dummyAdminUser, tenantCode,
queue.getId(), desc);
+ }
+ }
+
+ public void createUser(String userName,
+ String userPassword,
+ String email,
+ String phone,
+ String tenantCode,
+ String queue,
+ int state) {
+ User user = usersService.queryUser(userName);
+ if (Objects.isNull(user)) {
+ Map<String, Object> tenantResult =
tenantService.queryByTenantCode(tenantCode);
+ Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST);
+ usersService.createUser(userName, userPassword, email,
tenant.getId(), phone, queue, state);
+ }
+ }
+
+ @PostConstruct
+ public void run() {
+ GatewayServer server = new GatewayServer(this);
+ GatewayServer.turnLoggingOn();
+ // Start server to accept python client RPC
+ server.start();
+ }
+
+ public static void main(String[] args) {
+ SpringApplication.run(PythonGatewayServer.class, args);
+ }
+}
diff --git a/dolphinscheduler-standalone-server/pom.xml
b/dolphinscheduler-standalone-server/pom.xml
index 8b9efc1..83ab68d 100644
--- a/dolphinscheduler-standalone-server/pom.xml
+++ b/dolphinscheduler-standalone-server/pom.xml
@@ -51,6 +51,10 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-python</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
index e5a9b00..4a41770 100644
---
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
+++
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
@@ -66,7 +66,8 @@ public class StandaloneServer {
new SpringApplicationBuilder(
ApiApplicationServer.class,
MasterServer.class,
- WorkerServer.class
+ WorkerServer.class,
+ PythonGatewayServer.class
).run(args);
}
diff --git a/pom.xml b/pom.xml
index b453ce9..5f9f897 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,7 @@
<reflections.version>0.9.12</reflections.version>
<byte-buddy.version>1.9.16</byte-buddy.version>
<java-websocket.version>1.5.1</java-websocket.version>
+ <py4j.version>0.10.9</py4j.version>
</properties>
<dependencyManagement>
@@ -263,6 +264,11 @@
<artifactId>dolphinscheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-python</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
@@ -678,6 +684,12 @@
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
+
+ <dependency>
+ <groupId>net.sf.py4j</groupId>
+ <artifactId>py4j</artifactId>
+ <version>${py4j.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -1234,5 +1246,6 @@
<module>dolphinscheduler-service</module>
<module>dolphinscheduler-microbench</module>
<module>dolphinscheduler-standalone-server</module>
+ <module>dolphinscheduler-python</module>
</modules>
</project>
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index b19e8b9..7474f00 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -201,6 +201,7 @@ protostuff-core-1.7.2.jar
protostuff-runtime-1.7.2.jar
protostuff-api-1.7.2.jar
protostuff-collectionschema-1.7.2.jar
+py4j-0.10.9.jar
quartz-2.3.0.jar
quartz-jobs-2.3.0.jar
reflections-0.9.12.jar