This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 6f93ebf [python] Add task dependent (#7405)
6f93ebf is described below
commit 6f93ebf3ba66a2e99e56e1844243dfc75d43556c
Author: Jiajie Zhong <[email protected]>
AuthorDate: Thu Dec 16 09:58:50 2021 +0800
[python] Add task dependent (#7405)
fix: #6926
---
.../examples/task_dependent_example.py | 73 ++
.../src/pydolphinscheduler/constants.py | 1 +
.../src/pydolphinscheduler/tasks/dependent.py | 277 +++++++
.../tests/tasks/test_dependent.py | 793 +++++++++++++++++++++
.../server/PythonGatewayServer.java | 37 +
5 files changed, 1181 insertions(+)
diff --git
a/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
b/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
new file mode 100644
index 0000000..255d599
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A example workflow for task dependent.
+
+This example will create two workflows named `task_dependent` and
`task_dependent_external`.
+`task_dependent` is true workflow define and run task dependent, while
`task_dependent_external`
+define outside workflow and task from dependent.
+
+After this script submit, we would get workflow as below:
+
+task_dependent_external:
+
+task_1
+task_2
+task_3
+
+task_dependent:
+
+task_dependent(this task dependent on task_dependent_external.task_1 and
task_dependent_external.task_2).
+"""
+from constants import ProcessDefinitionDefault
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem,
Or
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(
+ name="task_dependent_external",
+ tenant="tenant_exists",
+) as pd:
+ task_1 = Shell(name="task_1", command="echo task 1")
+ task_2 = Shell(name="task_2", command="echo task 2")
+ task_3 = Shell(name="task_3", command="echo task 3")
+ pd.submit()
+
+with ProcessDefinition(
+ name="task_dependent",
+ tenant="tenant_exists",
+) as pd:
+ task = Dependent(
+ name="task_dependent",
+ dependence=And(
+ Or(
+ DependentItem(
+ project_name=ProcessDefinitionDefault.PROJECT,
+ process_definition_name="task_dependent_external",
+ dependent_task_name="task_1",
+ ),
+ DependentItem(
+ project_name=ProcessDefinitionDefault.PROJECT,
+ process_definition_name="task_dependent_external",
+ dependent_task_name="task_2",
+ ),
+ )
+ ),
+ )
+ pd.submit()
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index c2899ab..c2d2e7f 100644
---
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -73,6 +73,7 @@ class TaskType(str):
SQL = "SQL"
SUB_PROCESS = "SUB_PROCESS"
PROCEDURE = "PROCEDURE"
+ DEPENDENT = "DEPENDENT"
class DefaultTaskCodeNum(str):
diff --git
a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
new file mode 100644
index 0000000..760ccab
--- /dev/null
+++
b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
@@ -0,0 +1,277 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task dependent."""
+
+from typing import Dict, Optional, Tuple
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSJavaGatewayException,
PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
+
+DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
+
+
+class DependentDate(str):
+ """Constant of Dependent date value.
+
+ These values set according to Java server side, if you want to add and
change it,
+ please change Java server side first.
+ """
+
+ # TODO Maybe we should add parent level to DependentDate for easy to use,
such as
+ # DependentDate.MONTH.THIS_MONTH
+
+ # Hour
+ CURRENT_HOUR = "currentHour"
+ LAST_ONE_HOUR = "last1Hour"
+ LAST_TWO_HOURS = "last2Hours"
+ LAST_THREE_HOURS = "last3Hours"
+ LAST_TWENTY_FOUR_HOURS = "last24Hours"
+
+ # Day
+ TODAY = "today"
+ LAST_ONE_DAYS = "last1Days"
+ LAST_TWO_DAYS = "last2Days"
+ LAST_THREE_DAYS = "last3Days"
+ LAST_SEVEN_DAYS = "last7Days"
+
+ # Week
+ THIS_WEEK = "thisWeek"
+ LAST_WEEK = "lastWeek"
+ LAST_MONDAY = "lastMonday"
+ LAST_TUESDAY = "lastTuesday"
+ LAST_WEDNESDAY = "lastWednesday"
+ LAST_THURSDAY = "lastThursday"
+ LAST_FRIDAY = "lastFriday"
+ LAST_SATURDAY = "lastSaturday"
+ LAST_SUNDAY = "lastSunday"
+
+ # Month
+ THIS_MONTH = "thisMonth"
+ LAST_MONTH = "lastMonth"
+ LAST_MONTH_BEGIN = "lastMonthBegin"
+ LAST_MONTH_END = "lastMonthEnd"
+
+
+class DependentItem(Base):
+ """Dependent item object, minimal unit for task dependent.
+
+ It declare which project, process_definition, task are dependent to this
task.
+ """
+
+ _DEFINE_ATTR = {
+ "project_code",
+ "definition_code",
+ "dep_task_code",
+ "cycle",
+ "date_value",
+ }
+
+ # TODO maybe we should conside overwrite operator `and` and `or` for
DependentItem to
+ # support more easy way to set relation
+ def __init__(
+ self,
+ project_name: str,
+ process_definition_name: str,
+ dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
+ dependent_date: Optional[DependentDate] = DependentDate.TODAY,
+ ):
+ obj_name =
f"{project_name}.{process_definition_name}.{dependent_task_name}.{dependent_date}"
+ super().__init__(obj_name)
+ self.project_name = project_name
+ self.process_definition_name = process_definition_name
+ self.dependent_task_name = dependent_task_name
+ if dependent_date is None:
+ raise PyDSParamException(
+ "Parameter dependent_date must provider by got None."
+ )
+ else:
+ self.dependent_date = dependent_date
+ self._code = {}
+
+ def __repr__(self) -> str:
+ return "depend_item_list"
+
+ @property
+ def project_code(self) -> str:
+ """Get dependent project code."""
+ return self.get_code_from_gateway().get("projectCode")
+
+ @property
+ def definition_code(self) -> str:
+ """Get dependent definition code."""
+ return self.get_code_from_gateway().get("processDefinitionCode")
+
+ @property
+ def dep_task_code(self) -> str:
+ """Get dependent tasks code list."""
+ if self.is_all_task:
+ return DEPENDENT_ALL_TASK_IN_WORKFLOW
+ else:
+ return self.get_code_from_gateway().get("taskDefinitionCode")
+
+ # TODO Maybe we should get cycle from dependent date class.
+ @property
+ def cycle(self) -> str:
+ """Get dependent cycle."""
+ if "Hour" in self.dependent_date:
+ return "hour"
+ elif self.dependent_date == "today" or "Days" in self.dependent_date:
+ return "day"
+ elif "Month" in self.dependent_date:
+ return "month"
+ else:
+ return "week"
+
+ @property
+ def date_value(self) -> str:
+ """Get dependent date."""
+ return self.dependent_date
+
+ @property
+ def is_all_task(self) -> bool:
+ """Check whether dependent all tasks or not."""
+ return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
+
+ @property
+ def code_parameter(self) -> Tuple:
+ """Get name info parameter to query code."""
+ param = (
+ self.project_name,
+ self.process_definition_name,
+ self.dependent_task_name if not self.is_all_task else None,
+ )
+ return param
+
+ def get_code_from_gateway(self) -> Dict:
+ """Get project, definition, task code from given parameter."""
+ if self._code:
+ return self._code
+ else:
+ gateway = launch_gateway()
+ try:
+ self._code =
gateway.entry_point.getDependentInfo(*self.code_parameter)
+ return self._code
+ except Exception:
+ raise PyDSJavaGatewayException("Function get_code_from_gateway
error.")
+
+
+class DependentOperator(Base):
+ """Set DependentItem or dependItemList with specific operator."""
+
+ _DEFINE_ATTR = {
+ "relation",
+ }
+
+ DEPENDENT_ITEM = "DependentItem"
+ DEPENDENT_OPERATOR = "DependentOperator"
+
+ def __init__(self, *args):
+ super().__init__(self.__class__.__name__)
+ self.args = args
+
+ def __repr__(self) -> str:
+ return "depend_task_list"
+
+ @classmethod
+ def operator_name(cls) -> str:
+ """Get operator name in different class."""
+ return cls.__name__.upper()
+
+ @property
+ def relation(self) -> str:
+ """Get operator name in different class, for function
:func:`get_define`."""
+ return self.operator_name()
+
+ def set_define_attr(self) -> str:
+ """Set attribute to function :func:`get_define`.
+
+ It is a wrapper for both `And` and `Or` operator.
+ """
+ result = []
+ attr = None
+ for dependent in self.args:
+ if isinstance(dependent, (DependentItem, DependentOperator)):
+ if attr is None:
+ attr = repr(dependent)
+ elif repr(dependent) != attr:
+ raise PyDSParamException(
+ "Dependent %s operator parameter only support same
type.",
+ self.relation,
+ )
+ else:
+ raise PyDSParamException(
+ "Dependent %s operator parameter support DependentItem and
"
+ "DependentOperator but got %s.",
+ (self.relation, type(dependent)),
+ )
+ result.append(dependent.get_define())
+ setattr(self, attr, result)
+ return attr
+
+ def get_define(self, camel_attr=True) -> Dict:
+ """Overwrite Base.get_define to get task dependent specific get
define."""
+ attr = self.set_define_attr()
+ dependent_define_attr = self._DEFINE_ATTR.union({attr})
+ return super().get_define_custom(
+ camel_attr=True, custom_attr=dependent_define_attr
+ )
+
+
+class And(DependentOperator):
+ """Operator And for task dependent.
+
+ It could accept both :class:`DependentItem` and children of
:class:`DependentOperator`,
+ and set AND condition to those args.
+ """
+
+ def __init__(self, *args):
+ super().__init__(*args)
+
+
+class Or(DependentOperator):
+ """Operator Or for task dependent.
+
+ It could accept both :class:`DependentItem` and children of
:class:`DependentOperator`,
+ and set OR condition to those args.
+ """
+
+ def __init__(self, *args):
+ super().__init__(*args)
+
+
+class Dependent(Task):
+ """Task dependent object, declare behavior for dependent task to
dolphinscheduler."""
+
+ def __init__(self, name: str, dependence: DependentOperator, *args,
**kwargs):
+ super().__init__(name, TaskType.DEPENDENT, *args, **kwargs)
+ self.dependence = dependence
+
+ @property
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) ->
Dict:
+ """Override Task.task_params for dependent task.
+
+ Dependent task have some specials attribute `dependence`, and in most
of the task
+ this attribute is None and use empty dict `{}` as default value. We do
not use class
+ attribute `_task_custom_attr` due to avoid attribute cover.
+ """
+ params = super().task_params
+ params["dependence"] = self.dependence.get_define()
+ return params
diff --git
a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
new file mode 100644
index 0000000..f16e291
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
@@ -0,0 +1,793 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task dependent."""
+import itertools
+from typing import Dict, List, Optional, Tuple, Union
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.dependent import (
+ And,
+ Dependent,
+ DependentDate,
+ DependentItem,
+ DependentOperator,
+ Or,
+)
+
+TEST_PROJECT = "test-project"
+TEST_PROCESS_DEFINITION = "test-process-definition"
+TEST_TASK = "test-task"
+TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456,
1234567
+
+TEST_OPERATOR_LIST = ("AND", "OR")
+
+
[email protected](
+ "dep_date, dep_cycle",
+ [
+ # hour
+ (DependentDate.CURRENT_HOUR, "hour"),
+ (DependentDate.LAST_ONE_HOUR, "hour"),
+ (DependentDate.LAST_TWO_HOURS, "hour"),
+ (DependentDate.LAST_THREE_HOURS, "hour"),
+ (DependentDate.LAST_TWENTY_FOUR_HOURS, "hour"),
+ # day
+ (DependentDate.TODAY, "day"),
+ (DependentDate.LAST_ONE_DAYS, "day"),
+ (DependentDate.LAST_TWO_DAYS, "day"),
+ (DependentDate.LAST_THREE_DAYS, "day"),
+ (DependentDate.LAST_SEVEN_DAYS, "day"),
+ # week
+ (DependentDate.THIS_WEEK, "week"),
+ (DependentDate.LAST_WEEK, "week"),
+ (DependentDate.LAST_MONDAY, "week"),
+ (DependentDate.LAST_TUESDAY, "week"),
+ (DependentDate.LAST_WEDNESDAY, "week"),
+ (DependentDate.LAST_THURSDAY, "week"),
+ (DependentDate.LAST_FRIDAY, "week"),
+ (DependentDate.LAST_SATURDAY, "week"),
+ (DependentDate.LAST_SUNDAY, "week"),
+ # month
+ (DependentDate.THIS_MONTH, "month"),
+ (DependentDate.LAST_MONTH, "month"),
+ (DependentDate.LAST_MONTH_BEGIN, "month"),
+ (DependentDate.LAST_MONTH_END, "month"),
+ ],
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+def test_dependent_item_get_define(mock_task_info, dep_date, dep_cycle):
+ """Test dependent.DependentItem get define.
+
+ Here we have test some cases as below.
+ ```py
+ {
+ "projectCode": "project code",
+ "definitionCode": "definition code",
+ "depTaskCode": "dep task code",
+ "cycle": "day",
+ "dateValue": "today"
+ }
+ ```
+ """
+ attr = {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": dep_date,
+ }
+ expect = {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": dep_cycle,
+ "dateValue": dep_date,
+ }
+ task = DependentItem(**attr)
+ assert expect == task.get_define()
+
+
+def test_dependent_item_date_error():
+ """Test error when pass None to dependent_date."""
+ with pytest.raises(
+ PyDSParamException, match="Parameter dependent_date must provider.*?"
+ ):
+ DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ dependent_date=None,
+ )
+
+
[email protected](
+ "task_name, result",
+ [
+ ({"dependent_task_name": TEST_TASK}, TEST_TASK),
+ ({}, None),
+ ],
+)
+def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
+ """Test dependent item property code_parameter."""
+ dependent_item = DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ **task_name,
+ )
+ expect = (TEST_PROJECT, TEST_PROCESS_DEFINITION, result)
+ assert dependent_item.code_parameter == expect
+
+
[email protected](
+ "arg_list",
+ [
+ [1, 2],
+ [
+ DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ ),
+ 1,
+ ],
+ [
+ And(
+ DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ )
+ ),
+ 1,
+ ],
+ [
+ DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ ),
+ And(
+ DependentItem(
+ project_name=TEST_PROJECT,
+ process_definition_name=TEST_PROCESS_DEFINITION,
+ )
+ ),
+ ],
+ ],
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+def test_dependent_operator_set_define_error(mock_code, arg_list):
+ """Test dependent operator function :func:`set_define` with not support
type."""
+ dep_op = DependentOperator(*arg_list)
+ with pytest.raises(PyDSParamException, match="Dependent .*? operator.*?"):
+ dep_op.set_define_attr()
+
+
[email protected](
+ # Test dependent operator, Test dependent item parameters, expect operator
define
+ "operators, kwargs, expect",
+ [
+ # Test dependent operator (And | Or) with single dependent item
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ ),
+ [
+ {
+ "relation": op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ ],
+ }
+ for op in TEST_OPERATOR_LIST
+ ],
+ ),
+ # Test dependent operator (And | Or) with two dependent item
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_WEEK,
+ },
+ ),
+ [
+ {
+ "relation": op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "week",
+ "dateValue": DependentDate.LAST_WEEK,
+ },
+ ],
+ }
+ for op in TEST_OPERATOR_LIST
+ ],
+ ),
+ # Test dependent operator (And | Or) with multiply dependent item
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_WEEK,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_ONE_DAYS,
+ },
+ ),
+ [
+ {
+ "relation": op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "week",
+ "dateValue": DependentDate.LAST_WEEK,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "day",
+ "dateValue": DependentDate.LAST_ONE_DAYS,
+ },
+ ],
+ }
+ for op in TEST_OPERATOR_LIST
+ ],
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+def test_operator_dependent_item(
+ mock_code_info,
+ operators: Tuple[DependentOperator],
+ kwargs: Tuple[dict],
+ expect: List[Dict],
+):
+ """Test DependentOperator(DependentItem) function get_define.
+
+ Here we have test some cases as below, including single dependentItem and
multiply dependentItem.
+ ```py
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {
+ "projectCode": "project code",
+ "definitionCode": "definition code",
+ "depTaskCode": "dep task code",
+ "cycle": "day",
+ "dateValue": "today"
+ },
+ ...
+ ]
+ }
+ ```
+ """
+ for idx, operator in enumerate(operators):
+ # Use variable to keep one or more dependent item to test dependent
operator behavior
+ dependent_item_list = []
+ for kwarg in kwargs:
+ dependent_item = DependentItem(**kwarg)
+ dependent_item_list.append(dependent_item)
+ op = operator(*dependent_item_list)
+ assert expect[idx] == op.get_define()
+
+
[email protected](
+ # Test dependent operator, Test dependent item parameters, expect operator
define
+ "operators, args, expect",
+ [
+ # Test dependent operator (And | Or) with single dependent task list
+ (
+ (And, Or),
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ ),
+ ),
+ [
+ {
+ "relation": par_op,
+ "dependTaskList": [
+ {
+ "relation": chr_op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ ],
+ }
+ ],
+ }
+ for (par_op, chr_op) in itertools.product(
+ TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
+ )
+ ],
+ ),
+ # Test dependent operator (And | Or) with two dependent task list
+ (
+ (And, Or),
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_WEEK,
+ },
+ ),
+ ),
+ [
+ {
+ "relation": par_op,
+ "dependTaskList": [
+ {
+ "relation": chr_op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "week",
+ "dateValue": DependentDate.LAST_WEEK,
+ },
+ ],
+ }
+ ],
+ }
+ for (par_op, chr_op) in itertools.product(
+ TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
+ )
+ ],
+ ),
+ # Test dependent operator (And | Or) with multiply dependent task list
+ (
+ (And, Or),
+ (
+ (And, Or),
+ (
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_WEEK,
+ },
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_ONE_DAYS,
+ },
+ ),
+ ),
+ [
+ {
+ "relation": par_op,
+ "dependTaskList": [
+ {
+ "relation": chr_op,
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "week",
+ "dateValue": DependentDate.LAST_WEEK,
+ },
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "day",
+ "dateValue": DependentDate.LAST_ONE_DAYS,
+ },
+ ],
+ }
+ ],
+ }
+ for (par_op, chr_op) in itertools.product(
+ TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
+ )
+ ],
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+def test_operator_dependent_task_list_multi_dependent_item(
+ mock_code_info,
+ operators: Tuple[DependentOperator],
+ args: Tuple[Union[Tuple, dict]],
+ expect: List[Dict],
+):
+ """Test DependentOperator(DependentOperator(DependentItem)) single
operator function get_define.
+
+ Here we have test some cases as below. This test case only test single
DependTaskList with one or
+ multiply dependItemList.
+ ```py
+ {
+ "relation": "OR",
+ "dependTaskList": [
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {
+ "projectCode": "project code",
+ "definitionCode": "definition code",
+ "depTaskCode": "dep task code",
+ "cycle": "day",
+ "dateValue": "today"
+ },
+ ...
+ ]
+ },
+ ]
+ }
+ ```
+ """
+ # variable expect_idx record idx should be use to get specific expect
+ expect_idx = 0
+
+ for op_idx, operator in enumerate(operators):
+ dependent_operator = args[0]
+ dependent_item_kwargs = args[1]
+
+ for dop_idx, dpt_op in enumerate(dependent_operator):
+ dependent_item_list = []
+ for dpt_kwargs in dependent_item_kwargs:
+ dpti = DependentItem(**dpt_kwargs)
+ dependent_item_list.append(dpti)
+ child_dep_op = dpt_op(*dependent_item_list)
+ op = operator(child_dep_op)
+ assert expect[expect_idx] == op.get_define()
+ expect_idx += 1
+
+
+def get_dep_task_list(*operator):
+ """Return dependent task list from given operators list."""
+ result = []
+ for op in operator:
+ result.append(
+ {
+ "relation": op.operator_name(),
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "month",
+ "dateValue": DependentDate.LAST_MONTH_END,
+ },
+ ],
+ }
+ )
+ return result
+
+
[email protected](
+ # Test dependent operator, Test dependent item parameters, expect operator
define
+ "operators, args, expect",
+ [
+ # Test dependent operator (And | Or) with two dependent task list
+ (
+ (And, Or),
+ (
+ ((And, And), (And, Or), (Or, And), (Or, Or)),
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ ),
+ [
+ {
+ "relation": parent_op.operator_name(),
+ "dependTaskList": get_dep_task_list(*child_ops),
+ }
+ for parent_op in (And, Or)
+ for child_ops in ((And, And), (And, Or), (Or, And), (Or, Or))
+ ],
+ ),
+ # Test dependent operator (And | Or) with multiple dependent task list
+ (
+ (And, Or),
+ (
+ ((And, And, And), (And, And, And, And), (And, And, And, And,
And)),
+ {
+ "project_name": TEST_PROJECT,
+ "process_definition_name": TEST_PROCESS_DEFINITION,
+ "dependent_task_name": TEST_TASK,
+ "dependent_date": DependentDate.LAST_MONTH_END,
+ },
+ ),
+ [
+ {
+ "relation": parent_op.operator_name(),
+ "dependTaskList": get_dep_task_list(*child_ops),
+ }
+ for parent_op in (And, Or)
+ for child_ops in (
+ (And, And, And),
+ (And, And, And, And),
+ (And, And, And, And, And),
+ )
+ ],
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+def test_operator_dependent_task_list_multi_dependent_list(
+ mock_code_info,
+ operators: Tuple[DependentOperator],
+ args: Tuple[Union[Tuple, dict]],
+ expect: List[Dict],
+):
+ """Test DependentOperator(DependentOperator(DependentItem)) multiply
operator function get_define.
+
+ Here we have test some cases as below. This test case only test single
DependTaskList with one or
+ multiply dependTaskList.
+ ```py
+ {
+ "relation": "OR",
+ "dependTaskList": [
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {
+ "projectCode": "project code",
+ "definitionCode": "definition code",
+ "depTaskCode": "dep task code",
+ "cycle": "day",
+ "dateValue": "today"
+ }
+ ]
+ },
+ ...
+ ]
+ }
+ ```
+ """
+ # variable expect_idx record idx should be use to get specific expect
+ expect_idx = 0
+ for op_idx, operator in enumerate(operators):
+ dependent_operator = args[0]
+ dependent_item_kwargs = args[1]
+
+ for dop_idx, dpt_ops in enumerate(dependent_operator):
+ dependent_task_list = [
+ dpt_op(DependentItem(**dependent_item_kwargs)) for dpt_op in
dpt_ops
+ ]
+ op = operator(*dependent_task_list)
+ assert (
+ expect[expect_idx] == op.get_define()
+ ), f"Failed with operator syntax {operator}.{dpt_ops}"
+ expect_idx += 1
+
+
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": TEST_PROJECT_CODE,
+ "processDefinitionCode": TEST_DEFINITION_CODE,
+ "taskDefinitionCode": TEST_TASK_CODE,
+ },
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_dependent_get_define(mock_code_version, mock_dep_code):
+ """Test task dependent function get_define."""
+ project_name = "test-dep-project"
+ process_definition_name = "test-dep-definition"
+ dependent_task_name = "test-dep-task"
+ dep_operator = And(
+ Or(
+ # test dependence with add tasks
+ DependentItem(
+ project_name=project_name,
+ process_definition_name=process_definition_name,
+ )
+ ),
+ And(
+ # test dependence with specific task
+ DependentItem(
+ project_name=project_name,
+ process_definition_name=process_definition_name,
+ dependent_task_name=dependent_task_name,
+ )
+ ),
+ )
+
+ name = "test_dependent_get_define"
+ expect = {
+ "code": 123,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "DEPENDENT",
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {
+ "relation": "AND",
+ "dependTaskList": [
+ {
+ "relation": "OR",
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": "0",
+ "cycle": "day",
+ "dateValue": "today",
+ }
+ ],
+ },
+ {
+ "relation": "AND",
+ "dependItemList": [
+ {
+ "projectCode": TEST_PROJECT_CODE,
+ "definitionCode": TEST_DEFINITION_CODE,
+ "depTaskCode": TEST_TASK_CODE,
+ "cycle": "day",
+ "dateValue": "today",
+ }
+ ],
+ },
+ ],
+ },
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+
+ task = Dependent(name, dependence=dep_operator)
+ assert task.get_define() == expect
diff --git
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index e7bbab8..bc50852 100644
---
a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++
b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -62,11 +62,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
import
org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import py4j.GatewayServer;
+@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler")
public class PythonGatewayServer extends SpringBootServletInitializer {
private static final Logger LOGGER =
LoggerFactory.getLogger(PythonGatewayServer.class);
@@ -428,6 +430,41 @@ public class PythonGatewayServer extends
SpringBootServletInitializer {
return result;
}
+ /**
+ * Get project, process definition, task code.
+ * Useful in Python API create dependent task which need processDefinition
information.
+ *
+ * @param projectName project name which process definition
belongs to
+ * @param processDefinitionName process definition name
+ * @param taskName task name
+ */
+ public Map<String, Object> getDependentInfo(String projectName, String
processDefinitionName, String taskName) {
+ Map<String, Object> result = new HashMap<>();
+
+ Project project = projectMapper.queryByName(projectName);
+ if (project == null) {
+ String msg = String.format("Can not find valid project by name
%s", projectName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ long projectCode = project.getCode();
+ result.put("projectCode", projectCode);
+
+ ProcessDefinition processDefinition =
processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
+ if (processDefinition == null) {
+ String msg = String.format("Can not find valid process definition
by name %s", processDefinitionName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ result.put("processDefinitionCode", processDefinition.getCode());
+
+ if (taskName != null) {
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByName(projectCode, taskName);
+ result.put("taskDefinitionCode", taskDefinition.getCode());
+ }
+ return result;
+ }
+
@PostConstruct
public void run() {
GatewayServer server = new GatewayServer(this);