This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 32d4411 [python] Add flake8 and black for code style and integrated
to GA (#6679)
32d4411 is described below
commit 32d4411469902705f31171d25494e9586560e220
Author: Jiajie Zhong <[email protected]>
AuthorDate: Fri Nov 5 15:14:28 2021 +0800
[python] Add flake8 and black for code style and integrated to GA (#6679)
* [python] Add code style lint for GA
* Change github action name
* Auto change by black
* Fix flake8
* Fix broken link for pyds README.md
* Auto fix by black
* Separate GitHub workflows
* Add Black badge and CI locally in README.md
---
.github/workflows/e2e.yml | 6 +
.github/workflows/{py-tests.yml => py-ci.yml} | 33 ++++-
.github/workflows/unit-test.yml | 2 +
.../{requirements_dev.txt => .flake8} | 27 ++--
.../pydolphinscheduler/README.md | 35 +++++-
.../pydolphinscheduler/examples/bulk_create.py | 9 +-
.../pydolphinscheduler/examples/tutorial.py | 5 +
.../pydolphinscheduler/requirements_dev.txt | 7 +-
.../pydolphinscheduler/setup.py | 10 +-
.../src/pydolphinscheduler/__init__.py | 2 +
.../src/pydolphinscheduler/constants.py | 25 +++-
.../src/pydolphinscheduler/core/__init__.py | 2 +
.../src/pydolphinscheduler/core/base.py | 27 ++--
.../src/pydolphinscheduler/core/base_side.py | 23 ++--
.../pydolphinscheduler/core/process_definition.py | 137 ++++++++++++++-------
.../src/pydolphinscheduler/core/task.py | 128 +++++++++++--------
.../src/pydolphinscheduler/java_gateway.py | 31 +++--
.../src/pydolphinscheduler/side/__init__.py | 2 +
.../src/pydolphinscheduler/side/project.py | 21 ++--
.../src/pydolphinscheduler/side/queue.py | 18 ++-
.../src/pydolphinscheduler/side/tenant.py | 26 ++--
.../src/pydolphinscheduler/side/user.py | 30 ++---
.../src/pydolphinscheduler/side/worker_group.py | 13 +-
.../src/pydolphinscheduler/tasks/__init__.py | 2 +
.../src/pydolphinscheduler/tasks/shell.py | 18 +--
.../src/pydolphinscheduler/utils/__init__.py | 2 +
.../src/pydolphinscheduler/utils/string.py | 6 +
.../pydolphinscheduler/tests/__init__.py | 2 +
.../pydolphinscheduler/tests/core/__init__.py | 2 +
.../tests/core/test_process_definition.py | 71 +++++++----
.../pydolphinscheduler/tests/core/test_task.py | 36 +++---
.../pydolphinscheduler/tests/tasks/__init__.py | 2 +
.../pydolphinscheduler/tests/tasks/test_shell.py | 21 ++--
.../pydolphinscheduler/tests/test_java_gateway.py | 6 +
.../pydolphinscheduler/tests/testing/__init__.py | 2 +
.../pydolphinscheduler/tests/testing/task.py | 5 +
36 files changed, 517 insertions(+), 277 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index dfa916a..244b0e5 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -17,7 +17,13 @@
on:
pull_request:
+ paths-ignore:
+ - '**/*.md'
+ - 'dolphinscheduler-python/pydolphinscheduler'
push:
+ paths-ignore:
+ - '**/*.md'
+ - 'dolphinscheduler-python/pydolphinscheduler'
branches:
- dev
diff --git a/.github/workflows/py-tests.yml b/.github/workflows/py-ci.yml
similarity index 69%
rename from .github/workflows/py-tests.yml
rename to .github/workflows/py-ci.yml
index e37cab3..5b8e42a 100644
--- a/.github/workflows/py-tests.yml
+++ b/.github/workflows/py-ci.yml
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-name: Python API Tests
+name: Python API
on:
push:
@@ -30,14 +30,41 @@ defaults:
working-directory: dolphinscheduler-python/pydolphinscheduler
jobs:
- build:
+ sanity:
+ name: Sanity
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ with:
+ submodules: true
+ - name: Sanity Check
+ uses: ./.github/actions/sanity-check
+ lint:
+ name: Code Style
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up Python 3.7
+ uses: actions/setup-python@v2
+ with:
+ python-version: 3.7
+ - name: Install Development Dependences
+ run: pip install -r requirements_dev.txt
+ - name: Run Black Checking
+ run: black --check .
+ - name: Run Flake8 Checking
+ run: flake8
+ pytest:
+ name: Pytest
+ needs:
+ - lint
+ - sanity
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
os: [ubuntu-18.04, macOS-latest, windows-latest]
-
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml
index 3087806..5a1056c 100644
--- a/.github/workflows/unit-test.yml
+++ b/.github/workflows/unit-test.yml
@@ -22,10 +22,12 @@ on:
paths-ignore:
- '**/*.md'
- 'dolphinscheduler-ui'
+ - 'dolphinscheduler-python/pydolphinscheduler'
push:
paths-ignore:
- '**/*.md'
- 'dolphinscheduler-ui'
+ - 'dolphinscheduler-python/pydolphinscheduler'
branches:
- dev
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
b/dolphinscheduler-python/pydolphinscheduler/.flake8
similarity index 61%
copy from dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
copy to dolphinscheduler-python/pydolphinscheduler/.flake8
index be98ce9..e676972 100644
--- a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
+++ b/dolphinscheduler-python/pydolphinscheduler/.flake8
@@ -15,10 +15,23 @@
# specific language governing permissions and limitations
# under the License.
-# testting
-pytest~=6.2.5
-# code linting and formatting
-flake8-black~=0.2.3
-# flake8
-# flake8-docstrings
-# flake8-black
+[flake8]
+max-line-length = 110
+exclude =
+ .git,
+ __pycache__,
+ .pytest_cache,
+ *.egg-info,
+ docs/source/conf.py
+ old,
+ build,
+ dist,
+ htmlcov
+ignore =
+ # It's clear and not need to add docstring
+ D107, # D107: Don't require docstrings on __init__
+ D105, # D105: Missing docstring in magic method
+ # Conflict to Black
+ W503 # W503: Line breaks before binary operators
+per-file-ignores =
+ src/pydolphinscheduler/side/__init__.py:F401
diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md
b/dolphinscheduler-python/pydolphinscheduler/README.md
index a660984..0cc36d7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/README.md
+++ b/dolphinscheduler-python/pydolphinscheduler/README.md
@@ -20,6 +20,7 @@
# pydolphinscheduler
[![GitHub Build][ga-py-test]][ga]
+[![Code style: black][black-shield]][black-gh]
pydolphinscheduler is python API for Apache DolphinScheduler, which allow you
definition
your workflow by python code, aka workflow-as-codes.
@@ -39,7 +40,7 @@ git clone [email protected]:apache/dolphinscheduler.git
# Install pydolphinscheduler from source
cd dolphinscheduler-python/pydolphinscheduler
-pip setup.py install
+pip install -e .
```
### Start Server And Run Example
@@ -77,6 +78,12 @@ We already clone the code in [quick start](#quick-start), so
next step we have t
in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ
IDEA][idea] to open it. And you could
just open directory `dolphinscheduler-python/pydolphinscheduler` instead of
`dolphinscheduler-python`.
+Then you should add developer dependence to make sure you could run test and
check code style locally
+
+```shell
+pip install -r requirements_dev.txt
+```
+
### Brief Concept
Apache DolphinScheduler is design to define workflow by UI, and
pydolphinscheduler try to define it by code. When
@@ -95,6 +102,25 @@ pydolphinscheduler tasks object, we use tasks to define
exact job we want Dolphi
we only support `shell` task to execute shell task. [This link][all-task] list
all tasks support in DolphinScheduler
and would be implemented in the further.
+### Code Style
+
+We use [Black][black] for code formatter and [Flake8][flake8] for pep8
checker. If you use [pycharm][pycharm]
+or [IntelliJ IDEA][idea], maybe you could follow
[Black-integration][black-editor] to configure them in your environment.
+
+Our Python API CI would automatically run unittest when you submit pull
request in GitHub, you could also run
+static check locally.
+
+```shell
+# We recommend you run Black before Flake8, because Black could auto fix some
code style issue
+# but Flake8 just hint when code style not match pep8
+
+# Run Black
+black .
+
+# Run Flake8
+flake8
+```
+
### Testing
pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action
will run our test when you create
@@ -115,6 +141,11 @@ PYTHONPATH=src/ pytest
[idea]: https://www.jetbrains.com/idea/
[all-task]:
https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html
[pytest]: https://docs.pytest.org/en/latest/
+[black]: https://black.readthedocs.io/en/stable/index.html
+[flake8]: https://flake8.pycqa.org/en/latest/index.html
+[black-editor]:
https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea
<!-- badge -->
-[ga-py-test]:
https://github.com/apache/dolphinscheduler/actions/workflows/py-tests.yml/badge.svg?branch=dev
+[ga-py-test]:
https://github.com/apache/dolphinscheduler/actions/workflows/py-ci.yml/badge.svg?branch=dev
[ga]: https://github.com/apache/dolphinscheduler/actions
+[black-shield]: https://img.shields.io/badge/code%20style-black-000000.svg
+[black-gh]: https://github.com/psf/black
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py
b/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py
index 74203a1..72bdb02 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py
@@ -16,11 +16,14 @@
# under the License.
"""
-This example show you how to create workflows in batch mode. After this
example run, we will create 10
-workflows named `workflow:<workflow_num>`, and with 3 tasks named
`task:<task_num>-workflow:<workflow_num>`
-in each workflow. Each workflow is linear shape as below, since we set
`IS_CHAIN=True`
+This example show you how to create workflows in batch mode.
+
+After this example run, we will create 10 workflows named
`workflow:<workflow_num>`, and with 3 tasks
+named `task:<task_num>-workflow:<workflow_num>` in each workflow. Task shape
as below
task:1-workflow:1 -> task:2-workflow:1 -> task:3-workflow:1
+
+Each workflow is linear since we set `IS_CHAIN=True`, you could change task to
parallel by set it to `False`.
"""
from pydolphinscheduler.core.process_definition import ProcessDefinition
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
index 7756534..c223836 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
@@ -16,6 +16,8 @@
# under the License.
r"""
+A tutorial example take you to experience pydolphinscheduler.
+
After tutorial.py file submit to Apache DolphinScheduler server a DAG would be
create,
and workflow DAG graph as below:
@@ -24,11 +26,14 @@ and workflow DAG graph as below:
task_parent --> --> task_union
\ /
--> task_child_two
+
+it will instantiate and run all the task it have.
"""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
+
with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
task_parent = Shell(name="task_parent", command="echo hello
pydolphinscheduler")
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
index be98ce9..2c3d409 100644
--- a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
+++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
@@ -18,7 +18,6 @@
# testting
pytest~=6.2.5
# code linting and formatting
-flake8-black~=0.2.3
-# flake8
-# flake8-docstrings
-# flake8-black
+flake8
+flake8-docstrings
+flake8-black
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py
b/dolphinscheduler-python/pydolphinscheduler/setup.py
index 8e9cea4..4a6c045 100644
--- a/dolphinscheduler-python/pydolphinscheduler/setup.py
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -15,19 +15,23 @@
# specific language governing permissions and limitations
# under the License.
+"""The script for setting up pydolphinscheduler."""
import sys
from os.path import dirname, join
from setuptools import find_packages, setup
-version = '0.0.1.dev0'
+version = "0.0.1.dev0"
if sys.version_info[0] < 3:
- raise Exception("pydolphinscheduler does not support Python 2. Please
upgrade to Python 3.")
+ raise Exception(
+ "pydolphinscheduler does not support Python 2. Please upgrade to
Python 3."
+ )
def read(*names, **kwargs):
+ """Read file content from given file path."""
return open(
join(dirname(__file__), *names), encoding=kwargs.get("encoding",
"utf8")
).read()
@@ -86,5 +90,5 @@ setup(
"py4j~=0.10",
# Dev
"pytest~=6.2",
- ]
+ ],
)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
index 13a8339..701b4cc 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init root of pydolphinscheduler."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index eda07aa..bdf0d9c 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -15,18 +15,19 @@
# specific language governing permissions and limitations
# under the License.
+"""Constants for pydolphinscheduler."""
+
+
class ProcessDefinitionReleaseState:
- """
- ProcessDefinition release state
- """
+ """Constants for
:class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release
state."""
+
ONLINE: str = "ONLINE"
OFFLINE: str = "OFFLINE"
class ProcessDefinitionDefault:
- """
- ProcessDefinition default values
- """
+ """Constants default value for
:class:`pydolphinscheduler.core.process_definition.ProcessDefinition`."""
+
PROJECT: str = "project-pydolphin"
TENANT: str = "tenant_pydolphin"
USER: str = "userPythonGateway"
@@ -40,6 +41,8 @@ class ProcessDefinitionDefault:
class TaskPriority(str):
+ """Constants for task priority."""
+
HIGHEST = "HIGHEST"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
@@ -48,23 +51,33 @@ class TaskPriority(str):
class TaskFlag(str):
+ """Constants for task flag."""
+
YES = "YES"
NO = "NO"
class TaskTimeoutFlag(str):
+ """Constants for task timeout flag."""
+
CLOSE = "CLOSE"
class TaskType(str):
+ """Constants for task type, it will also show you which kind we support up
to now."""
+
SHELL = "SHELL"
class DefaultTaskCodeNum(str):
+ """Constants and default value for default task code number."""
+
DEFAULT = 1
class JavaGatewayDefault(str):
+ """Constants and default value for java gateway."""
+
RESULT_MESSAGE_KEYWORD = "msg"
RESULT_MESSAGE_SUCCESS = "success"
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
index 13a8339..3fbddf3 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init pydolphinscheduler.core package."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
index 175754f..ce71a4a 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler Base object."""
+
from typing import Optional, Dict
# from pydolphinscheduler.side.user import User
@@ -22,24 +24,15 @@ from pydolphinscheduler.utils.string import attr2camel
class Base:
- """
- Base
- """
+ """DolphinScheduler Base object."""
- _KEY_ATTR: set = {
- "name",
- "description"
- }
+ _KEY_ATTR: set = {"name", "description"}
_TO_DICT_ATTR: set = set()
DEFAULT_ATTR: Dict = {}
- def __init__(
- self,
- name: str,
- description: Optional[str] = None
- ):
+ def __init__(self, name: str, description: Optional[str] = None):
self.name = name
self.description = description
@@ -47,12 +40,18 @@ class Base:
return f'<{type(self).__name__}: name="{self.name}">'
def __eq__(self, other):
- return type(self) == type(other) and \
- all(getattr(self, a, None) == getattr(other, a, None) for a in
self._KEY_ATTR)
+ return type(self) == type(other) and all(
+ getattr(self, a, None) == getattr(other, a, None) for a in
self._KEY_ATTR
+ )
# TODO check how Redash do
# TODO DRY
def to_dict(self, camel_attr=True) -> Dict:
+ """Get object key attribute dict.
+
+ use attribute `self._TO_DICT_ATTR` to determine which attributes
should including to
+ children `to_dict` function.
+ """
# content = {}
# for attr, value in self.__dict__.items():
# # Don't publish private variables
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
index cf0f14e..ed20d70 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""Module for side object."""
+
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
@@ -22,22 +24,17 @@ from pydolphinscheduler.core.base import Base
class BaseSide(Base):
- def __init__(
- self,
- name: str,
- description: Optional[str] = None
- ):
+ """Base class for side object, it declare base behavior for them."""
+
+ def __init__(self, name: str, description: Optional[str] = None):
super().__init__(name, description)
@classmethod
def create_if_not_exists(
- cls,
- # TODO comment for avoiding cycle import
- # user: Optional[User] = ProcessDefinitionDefault.USER
- user=ProcessDefinitionDefault.USER
+ cls,
+ # TODO comment for avoiding cycle import
+ # user: Optional[User] = ProcessDefinitionDefault.USER
+ user=ProcessDefinitionDefault.USER,
):
- """
- Create Base if not exists
- """
-
+ """Create Base if not exists."""
raise NotImplementedError
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index fa6ad97..500f2d2 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -14,36 +14,46 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Module process definition, core class for workflow define."""
+
import json
from typing import Optional, List, Dict, Set
-from pydolphinscheduler.constants import ProcessDefinitionReleaseState,
ProcessDefinitionDefault
+from pydolphinscheduler.constants import (
+ ProcessDefinitionReleaseState,
+ ProcessDefinitionDefault,
+)
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.side import Tenant, Project, User
class ProcessDefinitionContext:
+ """Class process definition context, use when task get process definition
from context expression."""
+
_context_managed_process_definition: Optional["ProcessDefinition"] = None
@classmethod
def set(cls, pd: "ProcessDefinition") -> None:
+ """Set attribute self._context_managed_process_definition."""
cls._context_managed_process_definition = pd
@classmethod
def get(cls) -> Optional["ProcessDefinition"]:
+ """Get attribute self._context_managed_process_definition."""
return cls._context_managed_process_definition
@classmethod
def delete(cls) -> None:
+ """Delete attribute self._context_managed_process_definition."""
cls._context_managed_process_definition = None
class ProcessDefinition(Base):
- """
- ProcessDefinition
- TODO :ref: comment may not correct ref
- TODO: maybe we should rename this class, currently use DS object name
+ """process definition object, will define process definition attribute,
task, relation.
+
+ TODO: maybe we should rename this class, currently use DS object name.
"""
# key attribute for identify ProcessDefinition object
@@ -70,17 +80,17 @@ class ProcessDefinition(Base):
}
def __init__(
- self,
- name: str,
- description: Optional[str] = None,
- user: Optional[str] = ProcessDefinitionDefault.USER,
- project: Optional[str] = ProcessDefinitionDefault.PROJECT,
- tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
- queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
- worker_group: Optional[str] =
ProcessDefinitionDefault.WORKER_GROUP,
- timeout: Optional[int] = 0,
- release_state: Optional[str] =
ProcessDefinitionReleaseState.ONLINE,
- param: Optional[List] = None
+ self,
+ name: str,
+ description: Optional[str] = None,
+ user: Optional[str] = ProcessDefinitionDefault.USER,
+ project: Optional[str] = ProcessDefinitionDefault.PROJECT,
+ tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
+ queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
+ worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+ timeout: Optional[int] = 0,
+ release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
+ param: Optional[List] = None,
):
super().__init__(name, description)
self._user = user
@@ -93,7 +103,7 @@ class ProcessDefinition(Base):
self.param = param
self.tasks: dict = {}
# TODO how to fix circle import
- self._task_relations: set["TaskRelation"] = set()
+ self._task_relations: set["TaskRelation"] = set() # noqa: F821
self._process_definition_code = None
def __enter__(self) -> "ProcessDefinition":
@@ -105,32 +115,43 @@ class ProcessDefinition(Base):
@property
def tenant(self) -> Tenant:
+ """Get attribute tenant."""
return Tenant(self._tenant)
@tenant.setter
def tenant(self, tenant: Tenant) -> None:
+ """Set attribute tenant."""
self._tenant = tenant.name
@property
def project(self) -> Project:
+ """Get attribute project."""
return Project(self._project)
@project.setter
def project(self, project: Project) -> None:
+ """Set attribute project."""
self._project = project.name
@property
def user(self) -> User:
- return User(self._user,
- ProcessDefinitionDefault.USER_PWD,
- ProcessDefinitionDefault.USER_EMAIL,
- ProcessDefinitionDefault.USER_PHONE,
- ProcessDefinitionDefault.TENANT,
- ProcessDefinitionDefault.QUEUE,
- ProcessDefinitionDefault.USER_STATE)
+ """Get user object.
+
+ For now we just get from python side but not from java gateway side,
so it may not correct.
+ """
+ return User(
+ self._user,
+ ProcessDefinitionDefault.USER_PWD,
+ ProcessDefinitionDefault.USER_EMAIL,
+ ProcessDefinitionDefault.USER_PHONE,
+ ProcessDefinitionDefault.TENANT,
+ ProcessDefinitionDefault.QUEUE,
+ ProcessDefinitionDefault.USER_STATE,
+ )
@property
def task_definition_json(self) -> List[Dict]:
+ """Return all tasks definition in list of dict."""
if not self.tasks:
return [self.tasks]
else:
@@ -138,26 +159,39 @@ class ProcessDefinition(Base):
@property
def task_relation_json(self) -> List[Dict]:
+ """Return all relation between tasks pair in list of dict."""
if not self.tasks:
return [self.tasks]
else:
self._handle_root_relation()
return [tr.to_dict() for tr in self._task_relations]
- # TODO inti DAG's tasks are in the same place
+ # TODO inti DAG's tasks are in the same location with default {x: 0, y: 0}
@property
def task_location(self) -> List[Dict]:
+ """Return all tasks location for all process definition.
+
+ For now, we only set all location with same x and y valued equal to 0.
Because we do not
+ find a good way to set task locations. This is requests from java
gateway interface.
+ """
if not self.tasks:
return [self.tasks]
else:
return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in
self.tasks]
@property
- def task_list(self) -> List["Task"]:
+ def task_list(self) -> List["Task"]: # noqa: F821
+ """Return list of tasks objects."""
return list(self.tasks.values())
def _handle_root_relation(self):
+ """Handle root task property
:class:`pydolphinscheduler.core.task.TaskRelation`.
+
+ Root task in DAG do not have dominant upstream node, but we have to
add an exactly default
+ upstream task with task_code equal to `0`. This is requests from java
gateway interface.
+ """
from pydolphinscheduler.core.task import TaskRelation
+
post_relation_code = set()
for relation in self._task_relations:
post_relation_code.add(relation.post_task_code)
@@ -166,46 +200,62 @@ class ProcessDefinition(Base):
root_relation = TaskRelation(pre_task_code=0,
post_task_code=task.code)
self._task_relations.add(root_relation)
- def add_task(self, task: "Task") -> None:
+ def add_task(self, task: "Task") -> None: # noqa: F821
+ """Add a single task to process definition."""
self.tasks[task.code] = task
task._process_definition = self
- def add_tasks(self, tasks: List["Task"]) -> None:
+ def add_tasks(self, tasks: List["Task"]) -> None: # noqa: F821
+ """Add task sequence to process definition, it a wrapper of
:func:`add_task`."""
for task in tasks:
self.add_task(task)
- def get_task(self, code: str) -> "Task":
+ def get_task(self, code: str) -> "Task": # noqa: F821
+ """Get task object from process definition by given code."""
if code not in self.tasks:
- raise ValueError("Task with code %s can not found in process
definition %", (code, self.name))
+ raise ValueError(
+ "Task with code %s can not found in process definition %",
+ (code, self.name),
+ )
return self.tasks[code]
# TODO which tying should return in this case
- def get_tasks_by_name(self, name: str) -> Set["Task"]:
+ def get_tasks_by_name(self, name: str) -> Set["Task"]: # noqa: F821
+ """Get tasks object by given name, if will return all tasks with this
name."""
find = set()
for task in self.tasks.values():
if task.name == name:
find.add(task)
return find
- def get_one_task_by_name(self, name: str) -> "Task":
+ def get_one_task_by_name(self, name: str) -> "Task": # noqa: F821
+ """Get exact one task from process definition by given name.
+
+ Function always return one task even though this process definition
have more than one task with
+ this name.
+ """
tasks = self.get_tasks_by_name(name)
if not tasks:
raise ValueError(f"Can not find task with name {name}.")
return tasks.pop()
def run(self):
- """
- Run ProcessDefinition instance, a shortcut for :ref: submit and :ref:
start
- Only support manual for now, schedule run will coming soon
+ """Submit and Start ProcessDefinition instance.
+
+ Shortcut for function :func:`submit` and function :func:`start`. Only
support manual start workflow
+ for now, and schedule run will coming soon.
:return:
"""
self.submit()
self.start()
def _ensure_side_model_exists(self):
- """
- Ensure side model exists which including :ref: Project, Tenant, User.
- If those model not exists, would create default value in :ref:
ProcessDefinitionDefault
+ """Ensure process definition side model exists.
+
+ For now, side object including
:class:`pydolphinscheduler.side.project.Project`,
+ :class:`pydolphinscheduler.side.tenant.Tenant`,
:class:`pydolphinscheduler.side.user.User`.
+ If these model not exists, would create default value in
+ :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
"""
# TODO used metaclass for more pythonic
self.tenant.create_if_not_exists(self._queue)
@@ -215,10 +265,7 @@ class ProcessDefinition(Base):
self.project.create_if_not_exists(self._user)
def submit(self) -> int:
- """
- Submit ProcessDefinition instance to java gateway
- :return:
- """
+ """Submit ProcessDefinition instance to java gateway."""
self._ensure_side_model_exists()
gateway = launch_gateway()
self._process_definition_code =
gateway.entry_point.createOrUpdateProcessDefinition(
@@ -238,9 +285,9 @@ class ProcessDefinition(Base):
return self._process_definition_code
def start(self) -> None:
- """
- Start ProcessDefinition instance which post to
`start-process-instance` to java gateway
- :return:
+ """Create and start ProcessDefinition instance.
+
+ which post to `start-process-instance` to java gateway
"""
gateway = launch_gateway()
gateway.entry_point.execProcessInstance(
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 51ad74b..6f9e454 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -15,18 +15,26 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler ObjectJsonBase, TaskParams and Task object."""
+
from typing import Optional, List, Dict, Set, Union, Sequence, Tuple
-from pydolphinscheduler.constants import TaskPriority,
ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \
- DefaultTaskCodeNum, JavaGatewayDefault
+from pydolphinscheduler.constants import (
+ TaskPriority,
+ ProcessDefinitionDefault,
+ TaskFlag,
+ TaskTimeoutFlag,
+)
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
-from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.utils.string import snake2camel, class_name2camel
class ObjectJsonBase:
+ """Task base class, define `__str__` and `to_dict` function would be use
in other task related class."""
+
DEFAULT_ATTR = {}
def __int__(self, *args, **kwargs):
@@ -35,36 +43,32 @@ class ObjectJsonBase:
def __str__(self) -> str:
content = []
for attribute, value in self.__dict__.items():
- content.append(f"\"{snake2camel(attribute)}\": {value}")
+ content.append(f'"{snake2camel(attribute)}": {value}')
content = ",".join(content)
- return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}"
+ return f'"{class_name2camel(type(self).__name__)}":{{{content}}}'
# TODO check how Redash do
# TODO DRY
def to_dict(self) -> Dict:
+ """Get object key attribute dict which determine by attribute
`DEFAULT_ATTR`."""
content = {snake2camel(attr): value for attr, value in
self.__dict__.items()}
content.update(self.DEFAULT_ATTR)
return content
class TaskParams(ObjectJsonBase):
- DEFAULT_CONDITION_RESULT = {
- "successNode": [
- ""
- ],
- "failedNode": [
- ""
- ]
- }
+ """TaskParams object, describe the key parameter of a single task."""
+
+ DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
def __init__(
- self,
- raw_script: str,
- local_params: Optional[List] = None,
- resource_list: Optional[List] = None,
- dependence: Optional[Dict] = None,
- wait_start_timeout: Optional[Dict] = None,
- condition_result: Optional[Dict] = None,
+ self,
+ raw_script: str,
+ local_params: Optional[List] = None,
+ resource_list: Optional[List] = None,
+ dependence: Optional[Dict] = None,
+ wait_start_timeout: Optional[Dict] = None,
+ condition_result: Optional[Dict] = None,
):
super().__init__()
self.raw_script = raw_script
@@ -77,18 +81,20 @@ class TaskParams(ObjectJsonBase):
class TaskRelation(ObjectJsonBase):
+ """TaskRelation object, describe the relation of exactly two tasks."""
+
DEFAULT_ATTR = {
"name": "",
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
- "conditionParams": {}
+ "conditionParams": {},
}
def __init__(
- self,
- pre_task_code: int,
- post_task_code: int,
+ self,
+ pre_task_code: int,
+ post_task_code: int,
):
super().__init__()
self.pre_task_code = pre_task_code
@@ -99,31 +105,32 @@ class TaskRelation(ObjectJsonBase):
class Task(Base):
+ """Task object, parent class for all exactly task type."""
DEFAULT_DEPS_ATTR = {
"name": "",
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
- "conditionParams": {}
+ "conditionParams": {},
}
def __init__(
- self,
- name: str,
- task_type: str,
- task_params: TaskParams,
- description: Optional[str] = None,
- flag: Optional[str] = TaskFlag.YES,
- task_priority: Optional[str] = TaskPriority.MEDIUM,
- worker_group: Optional[str] =
ProcessDefinitionDefault.WORKER_GROUP,
- delay_time: Optional[int] = 0,
- fail_retry_times: Optional[int] = 0,
- fail_retry_interval: Optional[int] = 1,
- timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
- timeout_notify_strategy: Optional = None,
- timeout: Optional[int] = 0,
- process_definition: Optional[ProcessDefinition] = None,
+ self,
+ name: str,
+ task_type: str,
+ task_params: TaskParams,
+ description: Optional[str] = None,
+ flag: Optional[str] = TaskFlag.YES,
+ task_priority: Optional[str] = TaskPriority.MEDIUM,
+ worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+ delay_time: Optional[int] = 0,
+ fail_retry_times: Optional[int] = 0,
+ fail_retry_interval: Optional[int] = 1,
+ timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
+ timeout_notify_strategy: Optional = None,
+ timeout: Optional[int] = 0,
+ process_definition: Optional[ProcessDefinition] = None,
):
super().__init__(name, description)
@@ -139,48 +146,62 @@ class Task(Base):
self.timeout_notify_strategy = timeout_notify_strategy
self.timeout = timeout
self._process_definition = None
- self.process_definition: ProcessDefinition = process_definition or
ProcessDefinitionContext.get()
+ self.process_definition: ProcessDefinition = (
+ process_definition or ProcessDefinitionContext.get()
+ )
self._upstream_task_codes: Set[int] = set()
self._downstream_task_codes: Set[int] = set()
self._task_relation: Set[TaskRelation] = set()
# move attribute code and version after _process_definition and
process_definition declare
self.code, self.version = self.gen_code_and_version()
# Add task to process definition, maybe we could put into property
process_definition latter
- if self.process_definition is not None and self.code not in
self.process_definition.tasks:
+ if (
+ self.process_definition is not None
+ and self.code not in self.process_definition.tasks
+ ):
self.process_definition.add_task(self)
@property
def process_definition(self) -> Optional[ProcessDefinition]:
+ """Get attribute process_definition."""
return self._process_definition
@process_definition.setter
def process_definition(self, process_definition:
Optional[ProcessDefinition]):
+ """Set attribute process_definition."""
self._process_definition = process_definition
def __hash__(self):
return hash(self.code)
def __lshift__(self, other: Union["Task", Sequence["Task"]]):
- """Implements Task << Task"""
+ """Implement Task << Task."""
self.set_upstream(other)
return other
def __rshift__(self, other: Union["Task", Sequence["Task"]]):
- """Implements Task >> Task"""
+ """Implement Task >> Task."""
self.set_downstream(other)
return other
def __rrshift__(self, other: Union["Task", Sequence["Task"]]):
- """Called for Task >> [Task] because list don't have __rshift__
operators."""
+ """Call for Task >> [Task] because list don't have __rshift__
operators."""
self.__lshift__(other)
return self
def __rlshift__(self, other: Union["Task", Sequence["Task"]]):
- """Called for Task << [Task] because list don't have __lshift__
operators."""
+ """Call for Task << [Task] because list don't have __lshift__
operators."""
self.__rshift__(other)
return self
- def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool
= True) -> None:
+ def _set_deps(
+ self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True
+ ) -> None:
+ """
+ Set parameter tasks dependent to current task.
+
+ it is a wrapper for :func:`set_upstream` and :func:`set_downstream`.
+ """
if not isinstance(tasks, Sequence):
tasks = [tasks]
@@ -207,21 +228,32 @@ class Task(Base):
self.process_definition._task_relations.add(task_relation)
def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ """Set parameter tasks as upstream to current task."""
self._set_deps(tasks, upstream=True)
def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
+ """Set parameter tasks as downstream to current task."""
self._set_deps(tasks, upstream=False)
# TODO code should better generate in bulk mode when :ref:
processDefinition run submit or start
def gen_code_and_version(self) -> Tuple:
+ """
+ Generate task code and version from java gateway.
+
+ If task name do not exists in process definition before, if will
generate new code and version id
+ equal to 0 by java gateway, otherwise if will return the exists code
and version.
+ """
# TODO get code from specific project process definition and task name
gateway = launch_gateway()
- result =
gateway.entry_point.getCodeAndVersion(self.process_definition._project,
self.name)
+ result = gateway.entry_point.getCodeAndVersion(
+ self.process_definition._project, self.name
+ )
# result =
gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
def to_dict(self, camel_attr=True) -> Dict:
+ """Task `to_dict` function which will return key attribute for Task
object."""
content = {}
for attr, value in self.__dict__.items():
# Don't publish private variables
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index e93e8f1..027ca94 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""Module java gateway, contain gateway behavior."""
+
from typing import Any, Optional
from py4j.java_collections import JavaMap
@@ -24,20 +26,29 @@ from pydolphinscheduler.constants import JavaGatewayDefault
def launch_gateway() -> JavaGateway:
- # TODO Note that automatic conversion makes calling Java methods slightly
less efficient because
- # in the worst case, Py4J needs to go through all registered converters
for all parameters.
- # This is why automatic conversion is disabled by default.
+ """Launch java gateway to pydolphinscheduler.
+
+ TODO Note that automatic conversion makes calling Java methods slightly
less efficient because
+ in the worst case, Py4J needs to go through all registered converters for
all parameters.
+ This is why automatic conversion is disabled by default.
+ """
gateway =
JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True))
return gateway
def gateway_result_checker(
- result: JavaMap,
- msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS
+ result: JavaMap,
+ msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS,
) -> Any:
- if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \
- JavaGatewayDefault.RESULT_STATUS_SUCCESS:
- raise RuntimeError(f"Failed when try to got result for java gateway")
- if msg_check is not None and
result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check:
- raise ValueError(f"Get result state not success.")
+ """Check weather java gateway result success or not."""
+ if (
+ result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString()
+ != JavaGatewayDefault.RESULT_STATUS_SUCCESS
+ ):
+ raise RuntimeError("Failed when try to got result for java gateway")
+ if (
+ msg_check is not None
+ and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check
+ ):
+ raise ValueError("Get result state not success.")
return result
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
index 2f376a5..de5188c 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""Init Side package, Side package keep object related to DolphinScheduler but
not in the Core part."""
+
from pydolphinscheduler.side.project import Project
from pydolphinscheduler.side.tenant import Tenant
from pydolphinscheduler.side.user import User
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
index b118be9..96051a2 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
@@ -15,31 +15,28 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler Project object."""
+
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.constants import ProcessDefinitionDefault
-from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+from pydolphinscheduler.java_gateway import launch_gateway
class Project(BaseSide):
- """
- Project
- """
+ """DolphinScheduler Project object."""
def __init__(
- self,
- name: str = ProcessDefinitionDefault.PROJECT,
- description: Optional[str] = None
+ self,
+ name: str = ProcessDefinitionDefault.PROJECT,
+ description: Optional[str] = None,
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
- """
- Create Project if not exists
- """
+ """Create Project if not exists."""
gateway = launch_gateway()
- result = gateway.entry_point.createProject(user, self.name,
self.description)
+ gateway.entry_point.createProject(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)
-
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
index 4c0d1f6..7201351 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler User object."""
+
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
@@ -23,22 +25,18 @@ from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_check
class Queue(BaseSide):
- """
- Queue
- """
+ """DolphinScheduler Queue object."""
def __init__(
- self,
- name: str = ProcessDefinitionDefault.QUEUE,
- description: Optional[str] = ""
+ self,
+ name: str = ProcessDefinitionDefault.QUEUE,
+ description: Optional[str] = "",
):
super().__init__(name, description)
def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
- """
- Create Queue if not exists
- """
+ """Create Queue if not exists."""
gateway = launch_gateway()
# Here we set Queue.name and Queue.queueName same as self.name
result = gateway.entry_point.createProject(user, self.name, self.name)
- gateway_result_checker(result, None)
\ No newline at end of file
+ gateway_result_checker(result, None)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
index 9cba533..508c033 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py
@@ -15,31 +15,31 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler Tenant object."""
+
from typing import Optional
from pydolphinscheduler.constants import ProcessDefinitionDefault
from pydolphinscheduler.core.base_side import BaseSide
-from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+from pydolphinscheduler.java_gateway import launch_gateway
class Tenant(BaseSide):
- """
- Tenant
- """
+ """DolphinScheduler Tenant object."""
def __init__(
- self,
- name: str = ProcessDefinitionDefault.TENANT,
- queue: str = ProcessDefinitionDefault.QUEUE,
- description: Optional[str] = None
+ self,
+ name: str = ProcessDefinitionDefault.TENANT,
+ queue: str = ProcessDefinitionDefault.QUEUE,
+ description: Optional[str] = None,
):
super().__init__(name, description)
self.queue = queue
- def create_if_not_exists(self, queue_name: str,
user=ProcessDefinitionDefault.USER) -> None:
- """
- Create Tenant if not exists
- """
+ def create_if_not_exists(
+ self, queue_name: str, user=ProcessDefinitionDefault.USER
+ ) -> None:
+ """Create Tenant if not exists."""
gateway = launch_gateway()
- result = gateway.entry_point.createTenant(self.name, self.description,
queue_name)
+ gateway.entry_point.createTenant(self.name, self.description,
queue_name)
# gateway_result_checker(result, None)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
index fc7c339..cd0145a 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
@@ -15,13 +15,17 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler User object."""
+
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
-from pydolphinscheduler.java_gateway import launch_gateway,
gateway_result_checker
+from pydolphinscheduler.java_gateway import launch_gateway
class User(BaseSide):
+ """DolphinScheduler User object."""
+
_KEY_ATTR = {
"name",
"password",
@@ -33,14 +37,14 @@ class User(BaseSide):
}
def __init__(
- self,
- name: str,
- password: str,
- email: str,
- phone: str,
- tenant: str,
- queue: Optional[str] = None,
- status: Optional[int] = 1,
+ self,
+ name: str,
+ password: str,
+ email: str,
+ phone: str,
+ tenant: str,
+ queue: Optional[str] = None,
+ status: Optional[int] = 1,
):
super().__init__(name)
self.password = password
@@ -51,18 +55,16 @@ class User(BaseSide):
self.status = status
def create_if_not_exists(self, **kwargs):
- """
- Create User if not exists
- """
+ """Create User if not exists."""
gateway = launch_gateway()
- result = gateway.entry_point.createUser(
+ gateway.entry_point.createUser(
self.name,
self.password,
self.email,
self.phone,
self.tenant,
self.queue,
- self.status
+ self.status,
)
# TODO recover result checker
# gateway_result_checker(result, None)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
index d4b1bb4..ed50ec6 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py
@@ -15,21 +15,16 @@
# specific language governing permissions and limitations
# under the License.
+"""DolphinScheduler Worker Group object."""
+
from typing import Optional
from pydolphinscheduler.core.base_side import BaseSide
class WorkerGroup(BaseSide):
- """
- Worker Group
- """
+ """DolphinScheduler Worker Group object."""
- def __init__(
- self,
- name: str,
- address: str,
- description: Optional[str] = None
- ):
+ def __init__(self, name: str, address: str, description: Optional[str] =
None):
super().__init__(name, description)
self.address = address
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index 13a8339..9eded99 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init pydolphinscheduler.tasks package."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index e60c78b..825902f 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -15,20 +15,22 @@
# specific language governing permissions and limitations
# under the License.
+"""Task shell."""
+
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task, TaskParams
class Shell(Task):
- # TODO maybe we could use instance name to replace attribute `name`
- # which is simplify as `task_shell = Shell(command = "echo 1")` and
- # task.name assign to `task_shell`
+ """Task shell object, declare behavior for shell task to dolphinscheduler.
+
+ TODO maybe we could use instance name to replace attribute `name`
+ which is simplify as `task_shell = Shell(command = "echo 1")` and
+ task.name assign to `task_shell`
+ """
+
def __init__(
- self,
- name: str,
- command: str,
- task_type: str = TaskType.SHELL,
- *args, **kwargs
+ self, name: str, command: str, task_type: str = TaskType.SHELL, *args,
**kwargs
):
task_params = TaskParams(raw_script=command)
super().__init__(name, task_type, task_params, *args, **kwargs)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
index 13a8339..f8d3fbf 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init utils package."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
index c3bab71..3fb6a24 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py
@@ -15,16 +15,22 @@
# specific language governing permissions and limitations
# under the License.
+"""String util function collections."""
+
+
def attr2camel(attr: str, include_private=True):
+ """Covert class attribute name to camel case."""
if include_private:
attr = attr.lstrip("_")
return snake2camel(attr)
def snake2camel(snake: str):
+ """Covert snake case to camel case."""
components = snake.split("_")
return components[0] + "".join(x.title() for x in components[1:])
def class_name2camel(class_name: str):
+ """Covert class name string to camel case."""
return class_name[0].lower() + class_name[1:]
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py
index 13a8339..5ce1f82 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init tests package."""
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py
index 13a8339..62ce0ea 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init core package tests."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index f4b6b1c..96d0a88 100644
---
a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -15,9 +15,14 @@
# specific language governing permissions and limitations
# under the License.
+"""Test process definition."""
+
import pytest
-from pydolphinscheduler.constants import ProcessDefinitionDefault,
ProcessDefinitionReleaseState
+from pydolphinscheduler.constants import (
+ ProcessDefinitionDefault,
+ ProcessDefinitionReleaseState,
+)
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import TaskParams
from pydolphinscheduler.side import Tenant, Project, User
@@ -26,15 +31,13 @@ from tests.testing.task import Task
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
[email protected](
- "func",
- [
- "run", "submit", "start"
- ]
-)
[email protected]("func", ["run", "submit", "start"])
def test_process_definition_key_attr(func):
+ """Test process definition have specific functions or attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
- assert hasattr(pd, func), f"ProcessDefinition instance don't have
attribute `{func}`"
+ assert hasattr(
+ pd, func
+ ), f"ProcessDefinition instance don't have attribute `{func}`"
@pytest.mark.parametrize(
@@ -42,21 +45,29 @@ def test_process_definition_key_attr(func):
[
("project", Project(ProcessDefinitionDefault.PROJECT)),
("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
- ("user", User(ProcessDefinitionDefault.USER,
- ProcessDefinitionDefault.USER_PWD,
- ProcessDefinitionDefault.USER_EMAIL,
- ProcessDefinitionDefault.USER_PHONE,
- ProcessDefinitionDefault.TENANT,
- ProcessDefinitionDefault.QUEUE,
- ProcessDefinitionDefault.USER_STATE)),
+ (
+ "user",
+ User(
+ ProcessDefinitionDefault.USER,
+ ProcessDefinitionDefault.USER_PWD,
+ ProcessDefinitionDefault.USER_EMAIL,
+ ProcessDefinitionDefault.USER_PHONE,
+ ProcessDefinitionDefault.TENANT,
+ ProcessDefinitionDefault.QUEUE,
+ ProcessDefinitionDefault.USER_STATE,
+ ),
+ ),
("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
def test_process_definition_default_value(name, value):
+ """Test process definition default attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
- assert getattr(pd, name) == value, \
- f"ProcessDefinition instance attribute `{name}` have not except
default value `{getattr(pd, name)}`"
+ assert getattr(pd, name) == value, (
+ f"ProcessDefinition instance attribute `{name}` not with "
+ f"except default value `{getattr(pd, name)}`"
+ )
@pytest.mark.parametrize(
@@ -68,13 +79,16 @@ def test_process_definition_default_value(name, value):
],
)
def test_process_definition_set_attr(name, cls, expect):
+ """Test process definition set specific attributes."""
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
setattr(pd, name, cls(expect))
assert getattr(pd, name) == cls(
- expect), f"ProcessDefinition set attribute `{name}` do not work
expect"
+ expect
+ ), f"ProcessDefinition set attribute `{name}` do not work expect"
def test_process_definition_to_dict_without_task():
+ """Test process definition function to_dict without task."""
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
@@ -93,11 +107,14 @@ def test_process_definition_to_dict_without_task():
def test_process_definition_simple():
+ """Test process definition simple create workflow, including process
definition, task, relation define."""
expect_tasks_num = 5
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
for i in range(expect_tasks_num):
task_params = TaskParams(raw_script=f"test-raw-script-{i}")
- curr_task = Task(name=f"task-{i}", task_type=f"type-{i}",
task_params=task_params)
+ curr_task = Task(
+ name=f"task-{i}", task_type=f"type-{i}",
task_params=task_params
+ )
# Set deps task i as i-1 parent
if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
@@ -113,10 +130,18 @@ def test_process_definition_simple():
task: Task = pd.get_one_task_by_name(f"task-{i}")
if i == 0:
assert task._upstream_task_codes == set()
- assert task._downstream_task_codes ==
{pd.get_one_task_by_name("task-1").code}
+ assert task._downstream_task_codes == {
+ pd.get_one_task_by_name("task-1").code
+ }
elif i == expect_tasks_num - 1:
- assert task._upstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i - 1}").code}
+ assert task._upstream_task_codes == {
+ pd.get_one_task_by_name(f"task-{i - 1}").code
+ }
assert task._downstream_task_codes == set()
else:
- assert task._upstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i - 1}").code}
- assert task._downstream_task_codes ==
{pd.get_one_task_by_name(f"task-{i + 1}").code}
+ assert task._upstream_task_codes == {
+ pd.get_one_task_by_name(f"task-{i - 1}").code
+ }
+ assert task._downstream_task_codes == {
+ pd.get_one_task_by_name(f"task-{i + 1}").code
+ }
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 6e03428..ef5d363 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+"""Test Task class function."""
from unittest.mock import patch
@@ -22,6 +23,7 @@ from pydolphinscheduler.core.task import TaskParams,
TaskRelation, Task
def test_task_params_to_dict():
+ """Test TaskParams object function to_dict."""
raw_script = "test_task_params_to_dict"
expect = {
"resourceList": [],
@@ -29,13 +31,14 @@ def test_task_params_to_dict():
"rawScript": raw_script,
"dependence": {},
"conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
- "waitStartTimeout": {}
+ "waitStartTimeout": {},
}
task_param = TaskParams(raw_script=raw_script)
assert task_param.to_dict() == expect
def test_task_relation_to_dict():
+ """Test TaskRelation object function to_dict."""
pre_task_code = 123
post_task_code = 456
expect = {
@@ -45,13 +48,16 @@ def test_task_relation_to_dict():
"preTaskVersion": 1,
"postTaskVersion": 1,
"conditionType": 0,
- "conditionParams": {}
+ "conditionParams": {},
}
- task_param = TaskRelation(pre_task_code=pre_task_code,
post_task_code=post_task_code)
+ task_param = TaskRelation(
+ pre_task_code=pre_task_code, post_task_code=post_task_code
+ )
assert task_param.to_dict() == expect
def test_task_to_dict():
+ """Test Task object function to_dict."""
code = 123
version = 1
name = "test_task_to_dict"
@@ -69,15 +75,8 @@ def test_task_to_dict():
"localParams": [],
"rawScript": raw_script,
"dependence": {},
- "conditionResult": {
- "successNode": [
- ""
- ],
- "failedNode": [
- ""
- ]
- },
- "waitStartTimeout": {}
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
@@ -86,12 +85,11 @@ def test_task_to_dict():
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
- "timeout": 0
+ "timeout": 0,
}
- with patch('pydolphinscheduler.core.task.Task.gen_code_and_version',
return_value=(code, version)):
- task = Task(
- name=name,
- task_type=task_type,
- task_params=TaskParams(raw_script)
- )
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
+ task = Task(name=name, task_type=task_type,
task_params=TaskParams(raw_script))
assert task.to_dict() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py
index 13a8339..095e301 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init tasks package tests."""
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index 91cc431..f5f5cfa 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+"""Test Task shell."""
+
from unittest.mock import patch
@@ -22,6 +24,7 @@ from pydolphinscheduler.tasks.shell import Shell
def test_shell_to_dict():
+ """Test task shell function to_dict."""
code = 123
version = 1
name = "test_shell_to_dict"
@@ -38,15 +41,8 @@ def test_shell_to_dict():
"localParams": [],
"rawScript": command,
"dependence": {},
- "conditionResult": {
- "successNode": [
- ""
- ],
- "failedNode": [
- ""
- ]
- },
- "waitStartTimeout": {}
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
@@ -55,8 +51,11 @@ def test_shell_to_dict():
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
- "timeout": 0
+ "timeout": 0,
}
- with patch('pydolphinscheduler.core.task.Task.gen_code_and_version',
return_value=(code, version)):
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(code, version),
+ ):
shell = Shell(name, command)
assert shell.to_dict() == expect
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
b/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
index 200c06d..d0456a6 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
@@ -15,17 +15,21 @@
# specific language governing permissions and limitations
# under the License.
+"""Test pydolphinscheduler java gateway."""
+
from py4j.java_gateway import java_import, JavaGateway
def test_gateway_connect():
+ """Test weather client could connect java gate way or not."""
gateway = JavaGateway()
app = gateway.entry_point
assert app.ping() == "PONG"
def test_jvm_simple():
+ """Test use JVM build-in object and operator from java gateway."""
gateway = JavaGateway()
smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
@@ -33,12 +37,14 @@ def test_jvm_simple():
def test_python_client_java_import_single():
+ """Test import single class from java gateway."""
gateway = JavaGateway()
java_import(gateway.jvm,
"org.apache.dolphinscheduler.common.utils.FileUtils")
assert hasattr(gateway.jvm, "FileUtils")
def test_python_client_java_import_package():
+ """Test import package contain multiple class from java gateway."""
gateway = JavaGateway()
java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
# test if jvm view have some common utils
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py
b/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py
index 13a8339..c8caf5b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/__init__.py
@@ -14,3 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Init testing package, it provider easy way for pydolphinscheduler test."""
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py
b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py
index 32d3ffa..e0affc9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/task.py
@@ -15,13 +15,18 @@
# specific language governing permissions and limitations
# under the License.
+"""Mock class Task for other test."""
+
import uuid
from pydolphinscheduler.core.task import Task as SourceTask
class Task(SourceTask):
+ """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
+
DEFAULT_VERSION = 1
def gen_code_and_version(self):
+ """Mock java gateway code and version, convenience method for
unittest."""
return uuid.uuid1().time, self.DEFAULT_VERSION