This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33461d26ed Add Tasks API endpoint tests for DAG without start_date 
(#40881)
33461d26ed is described below

commit 33461d26ed9b35da72cbdf559e3247a074fc7d4c
Author: Omkar P <[email protected]>
AuthorDate: Fri Jul 26 16:34:03 2024 +0530

    Add Tasks API endpoint tests for DAG without start_date (#40881)
---
 .../api_connexion/endpoints/test_task_endpoint.py  | 125 ++++++++++++++++++++-
 1 file changed, 124 insertions(+), 1 deletion(-)

diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py 
b/tests/api_connexion/endpoints/test_task_endpoint.py
index 64b7f4a8c5..defbe066c9 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -58,10 +58,13 @@ def configured_app(minimal_app_for_api):
 class TestTaskEndpoint:
     dag_id = "test_dag"
     mapped_dag_id = "test_mapped_task"
+    unscheduled_dag_id = "test_unscheduled_dag"
     task_id = "op1"
     task_id2 = "op2"
     task_id3 = "op3"
     mapped_task_id = "mapped_task"
+    unscheduled_task_id1 = "unscheduled_task_1"
+    unscheduled_task_id2 = "unscheduled_task_2"
     task1_start_date = datetime(2020, 6, 15)
     task2_start_date = datetime(2020, 6, 16)
 
@@ -77,9 +80,18 @@ class TestTaskEndpoint:
             # We don't care about how the operator runs here, only its 
presence.
             
EmptyOperator.partial(task_id=self.mapped_task_id)._expand(EXPAND_INPUT_EMPTY, 
strict=False)
 
+        with DAG(self.unscheduled_dag_id, start_date=None, schedule=None) as 
unscheduled_dag:
+            task4 = EmptyOperator(task_id=self.unscheduled_task_id1, 
params={"is_unscheduled": True})
+            task5 = EmptyOperator(task_id=self.unscheduled_task_id2, 
params={"is_unscheduled": True})
+
         task1 >> task2
+        task4 >> task5
         dag_bag = DagBag(os.devnull, include_examples=False)
-        dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag}
+        dag_bag.dags = {
+            dag.dag_id: dag,
+            mapped_dag.dag_id: mapped_dag,
+            unscheduled_dag.dag_id: unscheduled_dag,
+        }
         configured_app.dag_bag = dag_bag  # type:ignore
 
     @staticmethod
@@ -182,6 +194,61 @@ class TestGetTask(TestTaskEndpoint):
         assert response.status_code == 200
         assert response.json == expected
 
+    def test_unscheduled_task(self):
+        expected = {
+            "class_ref": {
+                "class_name": "EmptyOperator",
+                "module_path": "airflow.operators.empty",
+            },
+            "depends_on_past": False,
+            "downstream_task_ids": [],
+            "end_date": None,
+            "execution_timeout": None,
+            "extra_links": [],
+            "operator_name": "EmptyOperator",
+            "owner": "airflow",
+            "params": {
+                "is_unscheduled": {
+                    "__class": "airflow.models.param.Param",
+                    "value": True,
+                    "description": None,
+                    "schema": {},
+                }
+            },
+            "pool": "default_pool",
+            "pool_slots": 1.0,
+            "priority_weight": 1.0,
+            "queue": "default",
+            "retries": 0.0,
+            "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, 
"microseconds": 0},
+            "retry_exponential_backoff": False,
+            "start_date": None,
+            "task_id": None,
+            "task_display_name": None,
+            "template_fields": [],
+            "trigger_rule": "all_success",
+            "ui_color": "#e8f7e4",
+            "ui_fgcolor": "#000",
+            "wait_for_downstream": False,
+            "weight_rule": "downstream",
+            "is_mapped": False,
+            "doc_md": None,
+        }
+        downstream_dict = {
+            self.unscheduled_task_id1: self.unscheduled_task_id2,
+            self.unscheduled_task_id2: None,
+        }
+        for task_id, downstream_task_id in downstream_dict.items():
+            response = self.client.get(
+                f"/api/v1/dags/{self.unscheduled_dag_id}/tasks/{task_id}",
+                environ_overrides={"REMOTE_USER": "test"},
+            )
+            assert response.status_code == 200
+            expected["downstream_task_ids"] = [downstream_task_id] if 
downstream_task_id else []
+            expected["task_id"] = task_id
+            expected["task_display_name"] = task_id
+            assert response.json == expected
+
     def test_should_respond_200_serialized(self):
         # Get the dag out of the dagbag before we patch it to an empty one
         SerializedDagModel.write_dag(self.app.dag_bag.get_dag(self.dag_id))
@@ -420,6 +487,62 @@ class TestGetTasks(TestTaskEndpoint):
         assert response.status_code == 200
         assert response.json == expected
 
+    def test_get_unscheduled_tasks(self):
+        downstream_dict = {
+            self.unscheduled_task_id1: self.unscheduled_task_id2,
+            self.unscheduled_task_id2: None,
+        }
+        expected = {
+            "tasks": [
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [downstream_task_id] if 
downstream_task_id else [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {
+                        "is_unscheduled": {
+                            "__class": "airflow.models.param.Param",
+                            "value": True,
+                            "description": None,
+                            "schema": {},
+                        }
+                    },
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": None,
+                    "task_id": task_id,
+                    "task_display_name": task_id,
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                    "doc_md": None,
+                }
+                for (task_id, downstream_task_id) in downstream_dict.items()
+            ],
+            "total_entries": len(downstream_dict),
+        }
+        response = self.client.get(
+            f"/api/v1/dags/{self.unscheduled_dag_id}/tasks", 
environ_overrides={"REMOTE_USER": "test"}
+        )
+        assert response.status_code == 200
+        assert response.json == expected
+
     def test_should_respond_200_ascending_order_by_start_date(self):
         response = self.client.get(
             f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",

Reply via email to