This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f66459b5694 Migrate public endpoint Get Tasks to FastAPI (#43980)
f66459b5694 is described below

commit f66459b56942dee66735bd12ced1ccd266f51e09
Author: Omkar P <[email protected]>
AuthorDate: Fri Nov 15 18:43:08 2024 +0530

    Migrate public endpoint Get Tasks to FastAPI (#43980)
    
    * Migrate public endpoint Get Tasks to FastAPI
    
    * Re-run static checks
    
    * Add migration marker
    
    * Remove 401 and 403, which are now added by default
    
    * Re-run static checks
---
 airflow/api_connexion/endpoints/task_endpoint.py   |   1 +
 airflow/api_fastapi/common/types.py                |  28 ---
 airflow/api_fastapi/core_api/datamodels/tasks.py   |  47 +++-
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  74 ++++++
 .../api_fastapi/core_api/routes/public/tasks.py    |  39 +++-
 airflow/ui/openapi-gen/queries/common.ts           |  18 ++
 airflow/ui/openapi-gen/queries/prefetch.ts         |  23 ++
 airflow/ui/openapi-gen/queries/queries.ts          |  29 +++
 airflow/ui/openapi-gen/queries/suspense.ts         |  29 +++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  20 ++
 airflow/ui/openapi-gen/requests/services.gen.ts    |  33 +++
 airflow/ui/openapi-gen/requests/types.gen.ts       |  46 ++++
 .../core_api/routes/public/test_tasks.py           | 254 ++++++++++++++++++++-
 13 files changed, 597 insertions(+), 44 deletions(-)

diff --git a/airflow/api_connexion/endpoints/task_endpoint.py 
b/airflow/api_connexion/endpoints/task_endpoint.py
index abc28cfee6f..3fd14c8cdf8 100644
--- a/airflow/api_connexion/endpoints/task_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_endpoint.py
@@ -47,6 +47,7 @@ def get_task(*, dag_id: str, task_id: str) -> APIResponse:
     return task_schema.dump(task)
 
 
+@mark_fastapi_migration_done
 @security.requires_access_dag("GET", DagAccessEntity.TASK)
 def get_tasks(*, dag_id: str, order_by: str = "task_id") -> APIResponse:
     """Get tasks for DAG."""
diff --git a/airflow/api_fastapi/common/types.py 
b/airflow/api_fastapi/common/types.py
index 2dc1be7d4cf..ab10a21c970 100644
--- a/airflow/api_fastapi/common/types.py
+++ b/airflow/api_fastapi/common/types.py
@@ -16,14 +16,11 @@
 # under the License.
 from __future__ import annotations
 
-import inspect
 from datetime import timedelta
 from typing import Annotated
 
 from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel, 
BeforeValidator, ConfigDict
 
-from airflow.models.mappedoperator import MappedOperator
-from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils import timezone
 
 UtcDateTime = Annotated[AwareDatetime, AfterValidator(lambda d: 
d.astimezone(timezone.utc))]
@@ -59,28 +56,3 @@ class TimeDelta(BaseModel):
 
 
 TimeDeltaWithValidation = Annotated[TimeDelta, 
BeforeValidator(_validate_timedelta_field)]
-
-
-def get_class_ref(obj) -> dict[str, str | None]:
-    """Return the class_ref dict for obj."""
-    is_mapped_or_serialized = isinstance(obj, (MappedOperator, 
SerializedBaseOperator))
-
-    module_path = None
-    if is_mapped_or_serialized:
-        module_path = obj._task_module
-    else:
-        module_type = inspect.getmodule(obj)
-        module_path = module_type.__name__ if module_type else None
-
-    class_name = None
-    if is_mapped_or_serialized:
-        class_name = obj._task_type
-    elif obj.__class__ is type:
-        class_name = obj.__name__
-    else:
-        class_name = type(obj).__name__
-
-    return {
-        "module_path": module_path,
-        "class_name": class_name,
-    }
diff --git a/airflow/api_fastapi/core_api/datamodels/tasks.py 
b/airflow/api_fastapi/core_api/datamodels/tasks.py
index 7caaf9c02f4..9b962390cc3 100644
--- a/airflow/api_fastapi/core_api/datamodels/tasks.py
+++ b/airflow/api_fastapi/core_api/datamodels/tasks.py
@@ -17,16 +17,44 @@
 
 from __future__ import annotations
 
+import inspect
 from collections import abc
 from datetime import datetime
+from typing import Any
 
-from pydantic import BaseModel, computed_field, field_validator
+from pydantic import BaseModel, computed_field, field_validator, 
model_validator
 
 from airflow.api_fastapi.common.types import TimeDeltaWithValidation
-from airflow.serialization.serialized_objects import 
encode_priority_weight_strategy
+from airflow.models.mappedoperator import MappedOperator
+from airflow.serialization.serialized_objects import SerializedBaseOperator, 
encode_priority_weight_strategy
 from airflow.task.priority_strategy import PriorityWeightStrategy
 
 
+def _get_class_ref(obj) -> dict[str, str | None]:
+    """Return the class_ref dict for obj."""
+    is_mapped_or_serialized = isinstance(obj, (MappedOperator, 
SerializedBaseOperator))
+
+    module_path = None
+    if is_mapped_or_serialized:
+        module_path = obj._task_module
+    else:
+        module_type = inspect.getmodule(obj)
+        module_path = module_type.__name__ if module_type else None
+
+    class_name = None
+    if is_mapped_or_serialized:
+        class_name = obj._task_type
+    elif obj.__class__ is type:
+        class_name = obj.__name__
+    else:
+        class_name = type(obj).__name__
+
+    return {
+        "module_path": module_path,
+        "class_name": class_name,
+    }
+
+
 class TaskResponse(BaseModel):
     """Task serializer for responses."""
 
@@ -57,6 +85,14 @@ class TaskResponse(BaseModel):
     class_ref: dict | None
     is_mapped: bool | None
 
+    @model_validator(mode="before")
+    @classmethod
+    def validate_model(cls, task: Any) -> Any:
+        task.__dict__.update(
+            {"class_ref": _get_class_ref(task), "is_mapped": isinstance(task, 
MappedOperator)}
+        )
+        return task
+
     @field_validator("weight_rule", mode="before")
     @classmethod
     def validate_weight_rule(cls, wr: str | PriorityWeightStrategy | None) -> 
str | None:
@@ -81,3 +117,10 @@ class TaskResponse(BaseModel):
     def extra_links(self) -> list[str]:
         """Extract and return extra_links."""
         return getattr(self, "operator_extra_links", [])
+
+
+class TaskCollectionResponse(BaseModel):
+    """Task collection serializer for responses."""
+
+    tasks: list[TaskResponse]
+    total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 997e6ff9203..23d4eeb6e40 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3204,6 +3204,64 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /public/dags/{dag_id}/tasks/:
+    get:
+      tags:
+      - Task
+      summary: Get Tasks
+      description: Get tasks for DAG.
+      operationId: get_tasks
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: order_by
+        in: query
+        required: false
+        schema:
+          type: string
+          default: task_id
+          title: Order By
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TaskCollectionResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '400':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Bad Request
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dags/{dag_id}/tasks/{task_id}:
     get:
       tags:
@@ -5546,6 +5604,22 @@ components:
       - latest_scheduler_heartbeat
       title: SchedulerInfoSchema
       description: Schema for Scheduler info.
+    TaskCollectionResponse:
+      properties:
+        tasks:
+          items:
+            $ref: '#/components/schemas/TaskResponse'
+          type: array
+          title: Tasks
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - tasks
+      - total_entries
+      title: TaskCollectionResponse
+      description: Task collection serializer for responses.
     TaskDependencyCollectionResponse:
       properties:
         dependencies:
diff --git a/airflow/api_fastapi/core_api/routes/public/tasks.py 
b/airflow/api_fastapi/core_api/routes/public/tasks.py
index 574d2fc7b78..a8a366cf6df 100644
--- a/airflow/api_fastapi/core_api/routes/public/tasks.py
+++ b/airflow/api_fastapi/core_api/routes/public/tasks.py
@@ -17,26 +17,52 @@
 
 from __future__ import annotations
 
+from operator import attrgetter
+
 from fastapi import HTTPException, Request, status
 
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.common.types import get_class_ref
-from airflow.api_fastapi.core_api.datamodels.tasks import TaskResponse
+from airflow.api_fastapi.core_api.datamodels.tasks import 
TaskCollectionResponse, TaskResponse
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.exceptions import TaskNotFound
 from airflow.models import DAG
-from airflow.models.mappedoperator import MappedOperator
 
 tasks_router = AirflowRouter(tags=["Task"], prefix="/dags/{dag_id}/tasks")
 
 
+@tasks_router.get(
+    "/",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_404_NOT_FOUND,
+        ]
+    ),
+)
+def get_tasks(
+    dag_id: str,
+    request: Request,
+    order_by: str = "task_id",
+) -> TaskCollectionResponse:
+    """Get tasks for DAG."""
+    dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+    try:
+        tasks = sorted(dag.tasks, key=attrgetter(order_by.lstrip("-")), 
reverse=(order_by[0:1] == "-"))
+    except AttributeError as err:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, str(err))
+    return TaskCollectionResponse(
+        tasks=[TaskResponse.model_validate(task, from_attributes=True) for 
task in tasks],
+        total_entries=(len(tasks)),
+    )
+
+
 @tasks_router.get(
     "/{task_id}",
     responses=create_openapi_http_exception_doc(
         [
             status.HTTP_400_BAD_REQUEST,
-            status.HTTP_401_UNAUTHORIZED,
-            status.HTTP_403_FORBIDDEN,
             status.HTTP_404_NOT_FOUND,
         ]
     ),
@@ -48,9 +74,6 @@ def get_task(dag_id: str, task_id, request: Request) -> 
TaskResponse:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
     try:
         task = dag.get_task(task_id=task_id)
-        task.__dict__.update(
-            {"class_ref": get_class_ref(task), "is_mapped": isinstance(task, 
MappedOperator)}
-        )
     except TaskNotFound:
         raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with id 
{task_id} was not found")
     return TaskResponse.model_validate(task, from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index d1bb5661434..60464a7580e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -931,6 +931,24 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = 
(
     },
   ]),
 ];
+export type TaskServiceGetTasksDefaultResponse = Awaited<
+  ReturnType<typeof TaskService.getTasks>
+>;
+export type TaskServiceGetTasksQueryResult<
+  TData = TaskServiceGetTasksDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useTaskServiceGetTasksKey = "TaskServiceGetTasks";
+export const UseTaskServiceGetTasksKeyFn = (
+  {
+    dagId,
+    orderBy,
+  }: {
+    dagId: string;
+    orderBy?: string;
+  },
+  queryKey?: Array<unknown>,
+) => [useTaskServiceGetTasksKey, ...(queryKey ?? [{ dagId, orderBy }])];
 export type TaskServiceGetTaskDefaultResponse = Awaited<
   ReturnType<typeof TaskService.getTask>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index d690d87a1b2..ac1cd93db37 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1264,6 +1264,29 @@ export const 
prefetchUseTaskInstanceServiceGetTaskInstances = (
         updatedAtLte,
       }),
   });
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseTaskServiceGetTasks = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    orderBy,
+  }: {
+    dagId: string;
+    orderBy?: string;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }),
+    queryFn: () => TaskService.getTasks({ dagId, orderBy }),
+  });
 /**
  * Get Task
  * Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index dc00175ba4d..68a31b0a7a1 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1508,6 +1508,35 @@ export const useTaskInstanceServiceGetTaskInstances = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskServiceGetTasks = <
+  TData = Common.TaskServiceGetTasksDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    orderBy,
+  }: {
+    dagId: string;
+    orderBy?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey),
+    queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData,
+    ...options,
+  });
 /**
  * Get Task
  * Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 9a1d7a81503..0c162cc42cd 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1492,6 +1492,35 @@ export const 
useTaskInstanceServiceGetTaskInstancesSuspense = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskServiceGetTasksSuspense = <
+  TData = Common.TaskServiceGetTasksDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    orderBy,
+  }: {
+    dagId: string;
+    orderBy?: string;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey),
+    queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData,
+    ...options,
+  });
 /**
  * Get Task
  * Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8e383146735..61007167cdd 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2895,6 +2895,26 @@ export const $SchedulerInfoSchema = {
   description: "Schema for Scheduler info.",
 } as const;
 
+export const $TaskCollectionResponse = {
+  properties: {
+    tasks: {
+      items: {
+        $ref: "#/components/schemas/TaskResponse",
+      },
+      type: "array",
+      title: "Tasks",
+    },
+    total_entries: {
+      type: "integer",
+      title: "Total Entries",
+    },
+  },
+  type: "object",
+  required: ["tasks", "total_entries"],
+  title: "TaskCollectionResponse",
+  description: "Task collection serializer for responses.",
+} as const;
+
 export const $TaskDependencyCollectionResponse = {
   properties: {
     dependencies: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index d5efbd4ab2f..451eac3365d 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -101,6 +101,8 @@ import type {
   GetMappedTaskInstanceResponse,
   GetTaskInstancesData,
   GetTaskInstancesResponse,
+  GetTasksData,
+  GetTasksResponse,
   GetTaskData,
   GetTaskResponse,
   DeleteVariableData,
@@ -1699,6 +1701,37 @@ export class TaskInstanceService {
 }
 
 export class TaskService {
+  /**
+   * Get Tasks
+   * Get tasks for DAG.
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.orderBy
+   * @returns TaskCollectionResponse Successful Response
+   * @throws ApiError
+   */
+  public static getTasks(
+    data: GetTasksData,
+  ): CancelablePromise<GetTasksResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: "/public/dags/{dag_id}/tasks/",
+      path: {
+        dag_id: data.dagId,
+      },
+      query: {
+        order_by: data.orderBy,
+      },
+      errors: {
+        400: "Bad Request",
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
+
   /**
    * Get Task
    * Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 91f5ede23d6..0b221ab4ae7 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -717,6 +717,14 @@ export type SchedulerInfoSchema = {
   latest_scheduler_heartbeat: string | null;
 };
 
+/**
+ * Task collection serializer for responses.
+ */
+export type TaskCollectionResponse = {
+  tasks: Array<TaskResponse>;
+  total_entries: number;
+};
+
 /**
  * Task scheduling dependencies collection serializer for responses.
  */
@@ -1392,6 +1400,13 @@ export type GetTaskInstancesData = {
 
 export type GetTaskInstancesResponse = TaskInstanceCollectionResponse;
 
+export type GetTasksData = {
+  dagId: string;
+  orderBy?: string;
+};
+
+export type GetTasksResponse = TaskCollectionResponse;
+
 export type GetTaskData = {
   dagId: string;
   taskId: unknown;
@@ -2747,6 +2762,37 @@ export type $OpenApiTs = {
       };
     };
   };
+  "/public/dags/{dag_id}/tasks/": {
+    get: {
+      req: GetTasksData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: TaskCollectionResponse;
+        /**
+         * Bad Request
+         */
+        400: HTTPExceptionResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dags/{dag_id}/tasks/{task_id}": {
     get: {
       req: GetTaskData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_tasks.py 
b/tests/api_fastapi/core_api/routes/public/test_tasks.py
index 44d5b984906..a6bff8c9f35 100644
--- a/tests/api_fastapi/core_api/routes/public/test_tasks.py
+++ b/tests/api_fastapi/core_api/routes/public/test_tasks.py
@@ -45,6 +45,7 @@ class TestTaskEndpoint:
     unscheduled_task_id2 = "unscheduled_task_2"
     task1_start_date = datetime(2020, 6, 15)
     task2_start_date = datetime(2020, 6, 16)
+    api_prefix = "/public/dags"
 
     def create_dags(self, test_client):
         with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date, 
doc_md="details") as dag:
@@ -128,7 +129,7 @@ class TestGetTask(TestTaskEndpoint):
             "doc_md": None,
         }
         response = test_client.get(
-            f"/public/dags/{self.dag_id}/tasks/{self.task_id}",
+            f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}",
         )
         assert response.status_code == 200
         assert response.json() == expected
@@ -164,7 +165,7 @@ class TestGetTask(TestTaskEndpoint):
             "doc_md": None,
         }
         response = test_client.get(
-            f"/public/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
+            
f"{self.api_prefix}/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
         )
         assert response.status_code == 200
         assert response.json() == expected
@@ -215,7 +216,7 @@ class TestGetTask(TestTaskEndpoint):
         }
         for task_id, downstream_task_id in downstream_dict.items():
             response = test_client.get(
-                f"/public/dags/{self.unscheduled_dag_id}/tasks/{task_id}",
+                f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks/{task_id}",
             )
             assert response.status_code == 200
             expected["downstream_task_ids"] = [downstream_task_id] if 
downstream_task_id else []
@@ -273,7 +274,7 @@ class TestGetTask(TestTaskEndpoint):
             "doc_md": None,
         }
         response = test_client.get(
-            f"/public/dags/{self.dag_id}/tasks/{self.task_id}",
+            f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}",
         )
         assert response.status_code == 200
         assert response.json() == expected
@@ -282,13 +283,254 @@ class TestGetTask(TestTaskEndpoint):
     def test_should_respond_404(self, test_client):
         task_id = "xxxx_not_existing"
         response = test_client.get(
-            f"/public/dags/{self.dag_id}/tasks/{task_id}",
+            f"{self.api_prefix}/{self.dag_id}/tasks/{task_id}",
         )
         assert response.status_code == 404
 
     def test_should_respond_404_when_dag_not_found(self, test_client):
         dag_id = "xxxx_not_existing"
         response = test_client.get(
-            f"/public/dags/{dag_id}/tasks/{self.task_id}",
+            f"{self.api_prefix}/{dag_id}/tasks/{self.task_id}",
         )
         assert response.status_code == 404
+
+
+class TestGetTasks(TestTaskEndpoint):
+    def test_should_respond_200(self, test_client):
+        expected = {
+            "tasks": [
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [self.task_id2],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {
+                        "foo": {
+                            "__class": "airflow.models.param.Param",
+                            "value": "bar",
+                            "description": None,
+                            "schema": {},
+                        }
+                    },
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-15T00:00:00Z",
+                    "task_id": "op1",
+                    "task_display_name": "op1",
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                    "doc_md": None,
+                },
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {},
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-16T00:00:00Z",
+                    "task_id": self.task_id2,
+                    "task_display_name": self.task_id2,
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                    "doc_md": None,
+                },
+            ],
+            "total_entries": 2,
+        }
+        response = test_client.get(f"{self.api_prefix}/{self.dag_id}/tasks")
+        assert response.status_code == 200
+        assert response.json() == expected
+
+    def test_get_tasks_mapped(self, test_client):
+        expected = {
+            "tasks": [
+                {
+                    "class_ref": {"class_name": "EmptyOperator", 
"module_path": "airflow.operators.empty"},
+                    "depends_on_past": False,
+                    "downstream_task_ids": [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "is_mapped": True,
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {},
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"microseconds": 0, "seconds": 300},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-15T00:00:00Z",
+                    "task_id": "mapped_task",
+                    "task_display_name": "mapped_task",
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "doc_md": None,
+                },
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {},
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-15T00:00:00Z",
+                    "task_id": self.task_id3,
+                    "task_display_name": self.task_id3,
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                    "doc_md": None,
+                },
+            ],
+            "total_entries": 2,
+        }
+        response = 
test_client.get(f"{self.api_prefix}/{self.mapped_dag_id}/tasks")
+        assert response.status_code == 200
+        assert response.json() == expected
+
+    def test_get_unscheduled_tasks(self, test_client):
+        downstream_dict = {
+            self.unscheduled_task_id1: self.unscheduled_task_id2,
+            self.unscheduled_task_id2: None,
+        }
+        expected = {
+            "tasks": [
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [downstream_task_id] if 
downstream_task_id else [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "operator_name": "EmptyOperator",
+                    "owner": "airflow",
+                    "params": {
+                        "is_unscheduled": {
+                            "__class": "airflow.models.param.Param",
+                            "value": True,
+                            "description": None,
+                            "schema": {},
+                        }
+                    },
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, 
"seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": None,
+                    "task_id": task_id,
+                    "task_display_name": task_id,
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                    "doc_md": None,
+                }
+                for (task_id, downstream_task_id) in downstream_dict.items()
+            ],
+            "total_entries": len(downstream_dict),
+        }
+        response = 
test_client.get(f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks")
+        assert response.status_code == 200
+        assert response.json() == expected
+
+    def test_should_respond_200_ascending_order_by_start_date(self, 
test_client):
+        response = test_client.get(
+            f"{self.api_prefix}/{self.dag_id}/tasks?order_by=start_date",
+        )
+        assert response.status_code == 200
+        assert self.task1_start_date < self.task2_start_date
+        assert response.json()["tasks"][0]["task_id"] == self.task_id
+        assert response.json()["tasks"][1]["task_id"] == self.task_id2
+
+    def test_should_respond_200_descending_order_by_start_date(self, 
test_client):
+        response = test_client.get(
+            f"{self.api_prefix}/{self.dag_id}/tasks?order_by=-start_date",
+        )
+        assert response.status_code == 200
+        # - means is descending
+        assert self.task1_start_date < self.task2_start_date
+        assert response.json()["tasks"][0]["task_id"] == self.task_id2
+        assert response.json()["tasks"][1]["task_id"] == self.task_id
+
+    def test_should_raise_400_for_invalid_order_by_name(self, test_client):
+        response = test_client.get(
+            
f"{self.api_prefix}/{self.dag_id}/tasks?order_by=invalid_task_colume_name",
+        )
+        assert response.status_code == 400
+        assert (
+            response.json()["detail"] == "'EmptyOperator' object has no 
attribute 'invalid_task_colume_name'"
+        )
+
+    def test_should_respond_404(self, test_client):
+        dag_id = "xxxx_not_existing"
+        response = test_client.get(f"{self.api_prefix}/{dag_id}/tasks")
+        assert response.status_code == 404

Reply via email to