This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f66459b5694 Migrate public endpoint Get Tasks to FastAPI (#43980)
f66459b5694 is described below
commit f66459b56942dee66735bd12ced1ccd266f51e09
Author: Omkar P <[email protected]>
AuthorDate: Fri Nov 15 18:43:08 2024 +0530
Migrate public endpoint Get Tasks to FastAPI (#43980)
* Migrate public endpoint Get Tasks to FastAPI
* Re-run static checks
* Add migration marker
* Remove 401 and 403, which are now added by default
* Re-run static checks
---
airflow/api_connexion/endpoints/task_endpoint.py | 1 +
airflow/api_fastapi/common/types.py | 28 ---
airflow/api_fastapi/core_api/datamodels/tasks.py | 47 +++-
.../api_fastapi/core_api/openapi/v1-generated.yaml | 74 ++++++
.../api_fastapi/core_api/routes/public/tasks.py | 39 +++-
airflow/ui/openapi-gen/queries/common.ts | 18 ++
airflow/ui/openapi-gen/queries/prefetch.ts | 23 ++
airflow/ui/openapi-gen/queries/queries.ts | 29 +++
airflow/ui/openapi-gen/queries/suspense.ts | 29 +++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 20 ++
airflow/ui/openapi-gen/requests/services.gen.ts | 33 +++
airflow/ui/openapi-gen/requests/types.gen.ts | 46 ++++
.../core_api/routes/public/test_tasks.py | 254 ++++++++++++++++++++-
13 files changed, 597 insertions(+), 44 deletions(-)
diff --git a/airflow/api_connexion/endpoints/task_endpoint.py
b/airflow/api_connexion/endpoints/task_endpoint.py
index abc28cfee6f..3fd14c8cdf8 100644
--- a/airflow/api_connexion/endpoints/task_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_endpoint.py
@@ -47,6 +47,7 @@ def get_task(*, dag_id: str, task_id: str) -> APIResponse:
return task_schema.dump(task)
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK)
def get_tasks(*, dag_id: str, order_by: str = "task_id") -> APIResponse:
"""Get tasks for DAG."""
diff --git a/airflow/api_fastapi/common/types.py
b/airflow/api_fastapi/common/types.py
index 2dc1be7d4cf..ab10a21c970 100644
--- a/airflow/api_fastapi/common/types.py
+++ b/airflow/api_fastapi/common/types.py
@@ -16,14 +16,11 @@
# under the License.
from __future__ import annotations
-import inspect
from datetime import timedelta
from typing import Annotated
from pydantic import AfterValidator, AliasGenerator, AwareDatetime, BaseModel,
BeforeValidator, ConfigDict
-from airflow.models.mappedoperator import MappedOperator
-from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils import timezone
UtcDateTime = Annotated[AwareDatetime, AfterValidator(lambda d:
d.astimezone(timezone.utc))]
@@ -59,28 +56,3 @@ class TimeDelta(BaseModel):
TimeDeltaWithValidation = Annotated[TimeDelta,
BeforeValidator(_validate_timedelta_field)]
-
-
-def get_class_ref(obj) -> dict[str, str | None]:
- """Return the class_ref dict for obj."""
- is_mapped_or_serialized = isinstance(obj, (MappedOperator,
SerializedBaseOperator))
-
- module_path = None
- if is_mapped_or_serialized:
- module_path = obj._task_module
- else:
- module_type = inspect.getmodule(obj)
- module_path = module_type.__name__ if module_type else None
-
- class_name = None
- if is_mapped_or_serialized:
- class_name = obj._task_type
- elif obj.__class__ is type:
- class_name = obj.__name__
- else:
- class_name = type(obj).__name__
-
- return {
- "module_path": module_path,
- "class_name": class_name,
- }
diff --git a/airflow/api_fastapi/core_api/datamodels/tasks.py
b/airflow/api_fastapi/core_api/datamodels/tasks.py
index 7caaf9c02f4..9b962390cc3 100644
--- a/airflow/api_fastapi/core_api/datamodels/tasks.py
+++ b/airflow/api_fastapi/core_api/datamodels/tasks.py
@@ -17,16 +17,44 @@
from __future__ import annotations
+import inspect
from collections import abc
from datetime import datetime
+from typing import Any
-from pydantic import BaseModel, computed_field, field_validator
+from pydantic import BaseModel, computed_field, field_validator,
model_validator
from airflow.api_fastapi.common.types import TimeDeltaWithValidation
-from airflow.serialization.serialized_objects import
encode_priority_weight_strategy
+from airflow.models.mappedoperator import MappedOperator
+from airflow.serialization.serialized_objects import SerializedBaseOperator,
encode_priority_weight_strategy
from airflow.task.priority_strategy import PriorityWeightStrategy
+def _get_class_ref(obj) -> dict[str, str | None]:
+ """Return the class_ref dict for obj."""
+ is_mapped_or_serialized = isinstance(obj, (MappedOperator,
SerializedBaseOperator))
+
+ module_path = None
+ if is_mapped_or_serialized:
+ module_path = obj._task_module
+ else:
+ module_type = inspect.getmodule(obj)
+ module_path = module_type.__name__ if module_type else None
+
+ class_name = None
+ if is_mapped_or_serialized:
+ class_name = obj._task_type
+ elif obj.__class__ is type:
+ class_name = obj.__name__
+ else:
+ class_name = type(obj).__name__
+
+ return {
+ "module_path": module_path,
+ "class_name": class_name,
+ }
+
+
class TaskResponse(BaseModel):
"""Task serializer for responses."""
@@ -57,6 +85,14 @@ class TaskResponse(BaseModel):
class_ref: dict | None
is_mapped: bool | None
+ @model_validator(mode="before")
+ @classmethod
+ def validate_model(cls, task: Any) -> Any:
+ task.__dict__.update(
+ {"class_ref": _get_class_ref(task), "is_mapped": isinstance(task,
MappedOperator)}
+ )
+ return task
+
@field_validator("weight_rule", mode="before")
@classmethod
def validate_weight_rule(cls, wr: str | PriorityWeightStrategy | None) ->
str | None:
@@ -81,3 +117,10 @@ class TaskResponse(BaseModel):
def extra_links(self) -> list[str]:
"""Extract and return extra_links."""
return getattr(self, "operator_extra_links", [])
+
+
+class TaskCollectionResponse(BaseModel):
+ """Task collection serializer for responses."""
+
+ tasks: list[TaskResponse]
+ total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 997e6ff9203..23d4eeb6e40 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3204,6 +3204,64 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/tasks/:
+ get:
+ tags:
+ - Task
+ summary: Get Tasks
+ description: Get tasks for DAG.
+ operationId: get_tasks
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: order_by
+ in: query
+ required: false
+ schema:
+ type: string
+ default: task_id
+ title: Order By
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks/{task_id}:
get:
tags:
@@ -5546,6 +5604,22 @@ components:
- latest_scheduler_heartbeat
title: SchedulerInfoSchema
description: Schema for Scheduler info.
+ TaskCollectionResponse:
+ properties:
+ tasks:
+ items:
+ $ref: '#/components/schemas/TaskResponse'
+ type: array
+ title: Tasks
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - tasks
+ - total_entries
+ title: TaskCollectionResponse
+ description: Task collection serializer for responses.
TaskDependencyCollectionResponse:
properties:
dependencies:
diff --git a/airflow/api_fastapi/core_api/routes/public/tasks.py
b/airflow/api_fastapi/core_api/routes/public/tasks.py
index 574d2fc7b78..a8a366cf6df 100644
--- a/airflow/api_fastapi/core_api/routes/public/tasks.py
+++ b/airflow/api_fastapi/core_api/routes/public/tasks.py
@@ -17,26 +17,52 @@
from __future__ import annotations
+from operator import attrgetter
+
from fastapi import HTTPException, Request, status
from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.common.types import get_class_ref
-from airflow.api_fastapi.core_api.datamodels.tasks import TaskResponse
+from airflow.api_fastapi.core_api.datamodels.tasks import
TaskCollectionResponse, TaskResponse
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import DAG
-from airflow.models.mappedoperator import MappedOperator
tasks_router = AirflowRouter(tags=["Task"], prefix="/dags/{dag_id}/tasks")
+@tasks_router.get(
+ "/",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_400_BAD_REQUEST,
+ status.HTTP_404_NOT_FOUND,
+ ]
+ ),
+)
+def get_tasks(
+ dag_id: str,
+ request: Request,
+ order_by: str = "task_id",
+) -> TaskCollectionResponse:
+ """Get tasks for DAG."""
+ dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+ try:
+ tasks = sorted(dag.tasks, key=attrgetter(order_by.lstrip("-")),
reverse=(order_by[0:1] == "-"))
+ except AttributeError as err:
+ raise HTTPException(status.HTTP_400_BAD_REQUEST, str(err))
+ return TaskCollectionResponse(
+ tasks=[TaskResponse.model_validate(task, from_attributes=True) for
task in tasks],
+ total_entries=(len(tasks)),
+ )
+
+
@tasks_router.get(
"/{task_id}",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
- status.HTTP_401_UNAUTHORIZED,
- status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND,
]
),
@@ -48,9 +74,6 @@ def get_task(dag_id: str, task_id, request: Request) ->
TaskResponse:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
try:
task = dag.get_task(task_id=task_id)
- task.__dict__.update(
- {"class_ref": get_class_ref(task), "is_mapped": isinstance(task,
MappedOperator)}
- )
except TaskNotFound:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with id
{task_id} was not found")
return TaskResponse.model_validate(task, from_attributes=True)
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index d1bb5661434..60464a7580e 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -931,6 +931,24 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn =
(
},
]),
];
+export type TaskServiceGetTasksDefaultResponse = Awaited<
+ ReturnType<typeof TaskService.getTasks>
+>;
+export type TaskServiceGetTasksQueryResult<
+ TData = TaskServiceGetTasksDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useTaskServiceGetTasksKey = "TaskServiceGetTasks";
+export const UseTaskServiceGetTasksKeyFn = (
+ {
+ dagId,
+ orderBy,
+ }: {
+ dagId: string;
+ orderBy?: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useTaskServiceGetTasksKey, ...(queryKey ?? [{ dagId, orderBy }])];
export type TaskServiceGetTaskDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTask>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index d690d87a1b2..ac1cd93db37 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1264,6 +1264,29 @@ export const
prefetchUseTaskInstanceServiceGetTaskInstances = (
updatedAtLte,
}),
});
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseTaskServiceGetTasks = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ orderBy,
+ }: {
+ dagId: string;
+ orderBy?: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }),
+ queryFn: () => TaskService.getTasks({ dagId, orderBy }),
+ });
/**
* Get Task
* Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index dc00175ba4d..68a31b0a7a1 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1508,6 +1508,35 @@ export const useTaskInstanceServiceGetTaskInstances = <
}) as TData,
...options,
});
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskServiceGetTasks = <
+ TData = Common.TaskServiceGetTasksDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ orderBy,
+ }: {
+ dagId: string;
+ orderBy?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey),
+ queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData,
+ ...options,
+ });
/**
* Get Task
* Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 9a1d7a81503..0c162cc42cd 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1492,6 +1492,35 @@ export const
useTaskInstanceServiceGetTaskInstancesSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskServiceGetTasksSuspense = <
+ TData = Common.TaskServiceGetTasksDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ orderBy,
+ }: {
+ dagId: string;
+ orderBy?: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseTaskServiceGetTasksKeyFn({ dagId, orderBy }, queryKey),
+ queryFn: () => TaskService.getTasks({ dagId, orderBy }) as TData,
+ ...options,
+ });
/**
* Get Task
* Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8e383146735..61007167cdd 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2895,6 +2895,26 @@ export const $SchedulerInfoSchema = {
description: "Schema for Scheduler info.",
} as const;
+export const $TaskCollectionResponse = {
+ properties: {
+ tasks: {
+ items: {
+ $ref: "#/components/schemas/TaskResponse",
+ },
+ type: "array",
+ title: "Tasks",
+ },
+ total_entries: {
+ type: "integer",
+ title: "Total Entries",
+ },
+ },
+ type: "object",
+ required: ["tasks", "total_entries"],
+ title: "TaskCollectionResponse",
+ description: "Task collection serializer for responses.",
+} as const;
+
export const $TaskDependencyCollectionResponse = {
properties: {
dependencies: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index d5efbd4ab2f..451eac3365d 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -101,6 +101,8 @@ import type {
GetMappedTaskInstanceResponse,
GetTaskInstancesData,
GetTaskInstancesResponse,
+ GetTasksData,
+ GetTasksResponse,
GetTaskData,
GetTaskResponse,
DeleteVariableData,
@@ -1699,6 +1701,37 @@ export class TaskInstanceService {
}
export class TaskService {
+ /**
+ * Get Tasks
+ * Get tasks for DAG.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.orderBy
+ * @returns TaskCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static getTasks(
+ data: GetTasksData,
+ ): CancelablePromise<GetTasksResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/tasks/",
+ path: {
+ dag_id: data.dagId,
+ },
+ query: {
+ order_by: data.orderBy,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
/**
* Get Task
* Get simplified representation of a task.
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 91f5ede23d6..0b221ab4ae7 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -717,6 +717,14 @@ export type SchedulerInfoSchema = {
latest_scheduler_heartbeat: string | null;
};
+/**
+ * Task collection serializer for responses.
+ */
+export type TaskCollectionResponse = {
+ tasks: Array<TaskResponse>;
+ total_entries: number;
+};
+
/**
* Task scheduling dependencies collection serializer for responses.
*/
@@ -1392,6 +1400,13 @@ export type GetTaskInstancesData = {
export type GetTaskInstancesResponse = TaskInstanceCollectionResponse;
+export type GetTasksData = {
+ dagId: string;
+ orderBy?: string;
+};
+
+export type GetTasksResponse = TaskCollectionResponse;
+
export type GetTaskData = {
dagId: string;
taskId: unknown;
@@ -2747,6 +2762,37 @@ export type $OpenApiTs = {
};
};
};
+ "/public/dags/{dag_id}/tasks/": {
+ get: {
+ req: GetTasksData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: TaskCollectionResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dags/{dag_id}/tasks/{task_id}": {
get: {
req: GetTaskData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_tasks.py
b/tests/api_fastapi/core_api/routes/public/test_tasks.py
index 44d5b984906..a6bff8c9f35 100644
--- a/tests/api_fastapi/core_api/routes/public/test_tasks.py
+++ b/tests/api_fastapi/core_api/routes/public/test_tasks.py
@@ -45,6 +45,7 @@ class TestTaskEndpoint:
unscheduled_task_id2 = "unscheduled_task_2"
task1_start_date = datetime(2020, 6, 15)
task2_start_date = datetime(2020, 6, 16)
+ api_prefix = "/public/dags"
def create_dags(self, test_client):
with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date,
doc_md="details") as dag:
@@ -128,7 +129,7 @@ class TestGetTask(TestTaskEndpoint):
"doc_md": None,
}
response = test_client.get(
- f"/public/dags/{self.dag_id}/tasks/{self.task_id}",
+ f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}",
)
assert response.status_code == 200
assert response.json() == expected
@@ -164,7 +165,7 @@ class TestGetTask(TestTaskEndpoint):
"doc_md": None,
}
response = test_client.get(
- f"/public/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
+
f"{self.api_prefix}/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
)
assert response.status_code == 200
assert response.json() == expected
@@ -215,7 +216,7 @@ class TestGetTask(TestTaskEndpoint):
}
for task_id, downstream_task_id in downstream_dict.items():
response = test_client.get(
- f"/public/dags/{self.unscheduled_dag_id}/tasks/{task_id}",
+ f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks/{task_id}",
)
assert response.status_code == 200
expected["downstream_task_ids"] = [downstream_task_id] if
downstream_task_id else []
@@ -273,7 +274,7 @@ class TestGetTask(TestTaskEndpoint):
"doc_md": None,
}
response = test_client.get(
- f"/public/dags/{self.dag_id}/tasks/{self.task_id}",
+ f"{self.api_prefix}/{self.dag_id}/tasks/{self.task_id}",
)
assert response.status_code == 200
assert response.json() == expected
@@ -282,13 +283,254 @@ class TestGetTask(TestTaskEndpoint):
def test_should_respond_404(self, test_client):
task_id = "xxxx_not_existing"
response = test_client.get(
- f"/public/dags/{self.dag_id}/tasks/{task_id}",
+ f"{self.api_prefix}/{self.dag_id}/tasks/{task_id}",
)
assert response.status_code == 404
def test_should_respond_404_when_dag_not_found(self, test_client):
dag_id = "xxxx_not_existing"
response = test_client.get(
- f"/public/dags/{dag_id}/tasks/{self.task_id}",
+ f"{self.api_prefix}/{dag_id}/tasks/{self.task_id}",
)
assert response.status_code == 404
+
+
+class TestGetTasks(TestTaskEndpoint):
+ def test_should_respond_200(self, test_client):
+ expected = {
+ "tasks": [
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [self.task_id2],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {
+ "foo": {
+ "__class": "airflow.models.param.Param",
+ "value": "bar",
+ "description": None,
+ "schema": {},
+ }
+ },
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00Z",
+ "task_id": "op1",
+ "task_display_name": "op1",
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ },
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-16T00:00:00Z",
+ "task_id": self.task_id2,
+ "task_display_name": self.task_id2,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ },
+ ],
+ "total_entries": 2,
+ }
+ response = test_client.get(f"{self.api_prefix}/{self.dag_id}/tasks")
+ assert response.status_code == 200
+ assert response.json() == expected
+
+ def test_get_tasks_mapped(self, test_client):
+ expected = {
+ "tasks": [
+ {
+ "class_ref": {"class_name": "EmptyOperator",
"module_path": "airflow.operators.empty"},
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "is_mapped": True,
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"microseconds": 0, "seconds": 300},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00Z",
+ "task_id": "mapped_task",
+ "task_display_name": "mapped_task",
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "doc_md": None,
+ },
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00Z",
+ "task_id": self.task_id3,
+ "task_display_name": self.task_id3,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ },
+ ],
+ "total_entries": 2,
+ }
+ response =
test_client.get(f"{self.api_prefix}/{self.mapped_dag_id}/tasks")
+ assert response.status_code == 200
+ assert response.json() == expected
+
+ def test_get_unscheduled_tasks(self, test_client):
+ downstream_dict = {
+ self.unscheduled_task_id1: self.unscheduled_task_id2,
+ self.unscheduled_task_id2: None,
+ }
+ expected = {
+ "tasks": [
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [downstream_task_id] if
downstream_task_id else [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "operator_name": "EmptyOperator",
+ "owner": "airflow",
+ "params": {
+ "is_unscheduled": {
+ "__class": "airflow.models.param.Param",
+ "value": True,
+ "description": None,
+ "schema": {},
+ }
+ },
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0,
"seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": None,
+ "task_id": task_id,
+ "task_display_name": task_id,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ "doc_md": None,
+ }
+ for (task_id, downstream_task_id) in downstream_dict.items()
+ ],
+ "total_entries": len(downstream_dict),
+ }
+ response =
test_client.get(f"{self.api_prefix}/{self.unscheduled_dag_id}/tasks")
+ assert response.status_code == 200
+ assert response.json() == expected
+
+ def test_should_respond_200_ascending_order_by_start_date(self,
test_client):
+ response = test_client.get(
+ f"{self.api_prefix}/{self.dag_id}/tasks?order_by=start_date",
+ )
+ assert response.status_code == 200
+ assert self.task1_start_date < self.task2_start_date
+ assert response.json()["tasks"][0]["task_id"] == self.task_id
+ assert response.json()["tasks"][1]["task_id"] == self.task_id2
+
+ def test_should_respond_200_descending_order_by_start_date(self,
test_client):
+ response = test_client.get(
+ f"{self.api_prefix}/{self.dag_id}/tasks?order_by=-start_date",
+ )
+ assert response.status_code == 200
+ # - means is descending
+ assert self.task1_start_date < self.task2_start_date
+ assert response.json()["tasks"][0]["task_id"] == self.task_id2
+ assert response.json()["tasks"][1]["task_id"] == self.task_id
+
+ def test_should_raise_400_for_invalid_order_by_name(self, test_client):
+ response = test_client.get(
+
f"{self.api_prefix}/{self.dag_id}/tasks?order_by=invalid_task_colume_name",
+ )
+ assert response.status_code == 400
+ assert (
+ response.json()["detail"] == "'EmptyOperator' object has no
attribute 'invalid_task_colume_name'"
+ )
+
+ def test_should_respond_404(self, test_client):
+ dag_id = "xxxx_not_existing"
+ response = test_client.get(f"{self.api_prefix}/{dag_id}/tasks")
+ assert response.status_code == 404