This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33461d26ed Add Tasks API endpoint tests for DAG without start_date
(#40881)
33461d26ed is described below
commit 33461d26ed9b35da72cbdf559e3247a074fc7d4c
Author: Omkar P <[email protected]>
AuthorDate: Fri Jul 26 16:34:03 2024 +0530
Add Tasks API endpoint tests for DAG without start_date (#40881)
---
.../api_connexion/endpoints/test_task_endpoint.py | 125 ++++++++++++++++++++-
1 file changed, 124 insertions(+), 1 deletion(-)
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py
b/tests/api_connexion/endpoints/test_task_endpoint.py
index 64b7f4a8c5..defbe066c9 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -58,10 +58,13 @@ def configured_app(minimal_app_for_api):
class TestTaskEndpoint:
dag_id = "test_dag"
mapped_dag_id = "test_mapped_task"
+ unscheduled_dag_id = "test_unscheduled_dag"
task_id = "op1"
task_id2 = "op2"
task_id3 = "op3"
mapped_task_id = "mapped_task"
+ unscheduled_task_id1 = "unscheduled_task_1"
+ unscheduled_task_id2 = "unscheduled_task_2"
task1_start_date = datetime(2020, 6, 15)
task2_start_date = datetime(2020, 6, 16)
@@ -77,9 +80,18 @@ class TestTaskEndpoint:
# We don't care about how the operator runs here, only its
presence.
EmptyOperator.partial(task_id=self.mapped_task_id)._expand(EXPAND_INPUT_EMPTY,
strict=False)
+ with DAG(self.unscheduled_dag_id, start_date=None, schedule=None) as
unscheduled_dag:
+ task4 = EmptyOperator(task_id=self.unscheduled_task_id1,
params={"is_unscheduled": True})
+ task5 = EmptyOperator(task_id=self.unscheduled_task_id2,
params={"is_unscheduled": True})
+
task1 >> task2
+ task4 >> task5
dag_bag = DagBag(os.devnull, include_examples=False)
- dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag}
+ dag_bag.dags = {
+ dag.dag_id: dag,
+ mapped_dag.dag_id: mapped_dag,
+ unscheduled_dag.dag_id: unscheduled_dag,
+ }
configured_app.dag_bag = dag_bag # type:ignore
@staticmethod
@@ -182,6 +194,61 @@ class TestGetTask(TestTaskEndpoint):
assert response.status_code == 200
assert response.json == expected
+ def test_unscheduled_task(self):
+ expected = {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {
+ "is_unscheduled": {
+ "__class": "airflow.models.param.Param",
+ "value": True,
+ "description": None,
+ "schema": {},
+ }
+ },
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300,
"microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": None,
+ "task_id": None,
+ "task_display_name": None,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ }
+ downstream_dict = {
+ self.unscheduled_task_id1: self.unscheduled_task_id2,
+ self.unscheduled_task_id2: None,
+ }
+ for task_id, downstream_task_id in downstream_dict.items():
+ response = self.client.get(
+ f"/api/v1/dags/{self.unscheduled_dag_id}/tasks/{task_id}",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 200
+ expected["downstream_task_ids"] = [downstream_task_id] if
downstream_task_id else []
+ expected["task_id"] = task_id
+ expected["task_display_name"] = task_id
+ assert response.json == expected
+
def test_should_respond_200_serialized(self):
# Get the dag out of the dagbag before we patch it to an empty one
SerializedDagModel.write_dag(self.app.dag_bag.get_dag(self.dag_id))
@@ -420,6 +487,62 @@ class TestGetTasks(TestTaskEndpoint):
assert response.status_code == 200
assert response.json == expected
+ def test_get_unscheduled_tasks(self):
+ downstream_dict = {
+ self.unscheduled_task_id1: self.unscheduled_task_id2,
+ self.unscheduled_task_id2: None,
+ }
+ expected = {
+ "tasks": [
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [downstream_task_id] if
downstream_task_id else [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {
+ "is_unscheduled": {
+ "__class": "airflow.models.param.Param",
+ "value": True,
+ "description": None,
+ "schema": {},
+ }
+ },
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": None,
+ "task_id": task_id,
+ "task_display_name": task_id,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ }
+ for (task_id, downstream_task_id) in downstream_dict.items()
+ ],
+ "total_entries": len(downstream_dict),
+ }
+ response = self.client.get(
+ f"/api/v1/dags/{self.unscheduled_dag_id}/tasks",
environ_overrides={"REMOTE_USER": "test"}
+ )
+ assert response.status_code == 200
+ assert response.json == expected
+
def test_should_respond_200_ascending_order_by_start_date(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",