This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b985b2579bd AIP-84 Get Task Instance (#43485)
b985b2579bd is described below
commit b985b2579bd0da9163e07edcc3f6e3b5cb7bb2f7
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Oct 31 17:24:19 2024 +0800
AIP-84 Get Task Instance (#43485)
* AIP-84 Get Task Instance
* Add TaskInstanceCollectionResponse
* Add more tests
* Update following code review
---
.../endpoints/task_instance_endpoint.py | 2 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 933 ++++++++++++++-------
.../api_fastapi/core_api/routes/public/__init__.py | 11 +-
.../core_api/routes/public/task_instances.py | 58 ++
airflow/api_fastapi/core_api/serializers/job.py | 38 +
.../core_api/serializers/task_instances.py | 71 ++
.../api_fastapi/core_api/serializers/trigger.py | 34 +
airflow/ui/openapi-gen/queries/common.ts | 181 ++--
airflow/ui/openapi-gen/queries/prefetch.ts | 188 +++--
airflow/ui/openapi-gen/queries/queries.ts | 379 +++++----
airflow/ui/openapi-gen/queries/suspense.ts | 237 +++---
airflow/ui/openapi-gen/requests/schemas.gen.ts | 552 ++++++++++--
airflow/ui/openapi-gen/requests/services.gen.ts | 366 ++++----
airflow/ui/openapi-gen/requests/types.gen.ts | 396 ++++++---
.../core_api/routes/public/test_task_instances.py | 396 +++++++++
15 files changed, 2751 insertions(+), 1091 deletions(-)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index eff16f54066..2ba236b065b 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -52,6 +52,7 @@ from airflow.models.dagrun import DagRun as DR
from airflow.models.taskinstance import TaskInstance as TI,
clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.utils.airflow_flask_app import get_airflow_app
+from airflow.utils.api_migration import mark_fastapi_migration_done
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -69,6 +70,7 @@ if TYPE_CHECKING:
T = TypeVar("T")
+@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance(
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 99f24ffdb72..4295211546f 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -315,6 +315,248 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/dags/{dag_id}/dagRuns/{dag_run_id}:
+ get:
+ tags:
+ - DagRun
+ summary: Get Dag Run
+ operationId: get_dag_run
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - DagRun
+ summary: Delete Dag Run
+ description: Delete a DAG Run entry.
+ operationId: delete_dag_run
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ responses:
+ '204':
+ description: Successful Response
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ patch:
+ tags:
+ - DagRun
+ summary: Patch Dag Run State
+ description: Modify a DAG Run.
+ operationId: patch_dag_run_state
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: update_mask
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: array
+ items:
+ type: string
+ - type: 'null'
+ title: Update Mask
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunPatchBody'
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGRunResponse'
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /public/dagSources/{file_token}:
+ get:
+ tags:
+ - DagSource
+ summary: Get Dag Source
+ description: Get source code using file token.
+ operationId: get_dag_source
+ parameters:
+ - name: file_token
+ in: path
+ required: true
+ schema:
+ type: string
+ title: File Token
+ - name: accept
+ in: header
+ required: false
+ schema:
+ type: string
+ default: '*/*'
+ title: Accept
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DAGSourceResponse'
+ text/plain:
+ schema:
+ type: string
+ example: dag code
+ '400':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Bad Request
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '406':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Acceptable
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/:
get:
tags:
@@ -812,84 +1054,26 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
- /public/dags/{dag_id}/dagRuns/{dag_run_id}:
+ /public/eventLogs/{event_log_id}:
get:
tags:
- - DagRun
- summary: Get Dag Run
- operationId: get_dag_run
+ - Event Log
+ summary: Get Event Log
+ operationId: get_event_log
parameters:
- - name: dag_id
+ - name: event_log_id
in: path
required: true
schema:
- type: string
- title: Dag Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
+ type: integer
+ title: Event Log Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
- $ref: '#/components/schemas/DAGRunResponse'
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- delete:
- tags:
- - DagRun
- summary: Delete Dag Run
- description: Delete a DAG Run entry.
- operationId: delete_dag_run
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
- responses:
- '204':
- description: Successful Response
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
+ $ref: '#/components/schemas/EventLogResponse'
'401':
content:
application/json:
@@ -914,159 +1098,53 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- patch:
+ /public/monitor/health:
+ get:
tags:
- - DagRun
- summary: Patch Dag Run State
- description: Modify a DAG Run.
- operationId: patch_dag_run_state
- parameters:
- - name: dag_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Id
- - name: dag_run_id
- in: path
- required: true
- schema:
- type: string
- title: Dag Run Id
- - name: update_mask
- in: query
- required: false
- schema:
- anyOf:
- - type: array
- items:
- type: string
- - type: 'null'
- title: Update Mask
- requestBody:
- required: true
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/DAGRunPatchBody'
+ - Monitor
+ summary: Get Health
+ operationId: get_health
responses:
'200':
description: Successful Response
content:
application/json:
schema:
- $ref: '#/components/schemas/DAGRunResponse'
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- /public/dagSources/{file_token}:
+ $ref: '#/components/schemas/HealthInfoSchema'
+ /public/plugins/:
get:
tags:
- - DagSource
- summary: Get Dag Source
- description: Get source code using file token.
- operationId: get_dag_source
+ - Plugin
+ summary: Get Plugins
+ operationId: get_plugins
parameters:
- - name: file_token
- in: path
- required: true
+ - name: limit
+ in: query
+ required: false
schema:
- type: string
- title: File Token
- - name: accept
- in: header
+ type: integer
+ default: 100
+ title: Limit
+ - name: offset
+ in: query
required: false
schema:
- type: string
- default: '*/*'
- title: Accept
+ type: integer
+ default: 0
+ title: Offset
responses:
'200':
description: Successful Response
content:
application/json:
schema:
- $ref: '#/components/schemas/DAGSourceResponse'
- text/plain:
- schema:
- type: string
- example: dag code
- '400':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Bad Request
- '401':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Unauthorized
- '403':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Forbidden
- '404':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Found
- '406':
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPExceptionResponse'
- description: Not Acceptable
+ $ref: '#/components/schemas/PluginCollectionResponse'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/monitor/health:
- get:
- tags:
- - Monitor
- summary: Get Health
- operationId: get_health
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HealthInfoSchema'
/public/pools/{pool_name}:
delete:
tags:
@@ -1356,74 +1434,39 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
- /public/plugins/:
+ /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}:
get:
tags:
- - Plugin
- summary: Get Plugins
- operationId: get_plugins
+ - Task Instance
+ summary: Get Task Instance
+ description: Get task instance.
+ operationId: get_task_instance
parameters:
- - name: limit
- in: query
- required: false
+ - name: dag_id
+ in: path
+ required: true
schema:
- type: integer
- default: 100
- title: Limit
- - name: offset
- in: query
- required: false
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
schema:
- type: integer
- default: 0
- title: Offset
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/PluginCollectionResponse'
- '422':
- description: Validation Error
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/HTTPValidationError'
- /public/version/:
- get:
- tags:
- - Version
- summary: Get Version
- description: Get version information.
- operationId: get_version
- responses:
- '200':
- description: Successful Response
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/VersionInfo'
- /public/eventLogs/{event_log_id}:
- get:
- tags:
- - Event Log
- summary: Get Event Log
- operationId: get_event_log
- parameters:
- - name: event_log_id
+ type: string
+ title: Dag Run Id
+ - name: task_id
in: path
required: true
schema:
- type: integer
- title: Event Log Id
+ type: string
+ title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
- $ref: '#/components/schemas/EventLogResponse'
+ $ref: '#/components/schemas/TaskInstanceResponse'
'401':
content:
application/json:
@@ -1690,6 +1733,20 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /public/version/:
+ get:
+ tags:
+ - Version
+ summary: Get Version
+ description: Get version information.
+ operationId: get_version
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/VersionInfo'
components:
schemas:
AppBuilderMenuItemResponse:
@@ -2757,14 +2814,82 @@ components:
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
- $ref: '#/components/schemas/TaskInstanceState'
+ $ref:
'#/components/schemas/airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState'
+ type: object
+ required:
+ - dag_run_types
+ - dag_run_states
+ - task_instance_states
+ title: HistoricalMetricDataResponse
+ description: Historical Metric Data serializer for responses.
+ JobResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ dag_id:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Dag Id
+ state:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: State
+ job_type:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Job Type
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ latest_heartbeat:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Latest Heartbeat
+ executor_class:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Executor Class
+ hostname:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Hostname
+ unixname:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Unixname
type: object
required:
- - dag_run_types
- - dag_run_states
- - task_instance_states
- title: HistoricalMetricDataResponse
- description: Historical Metric Data serializer for responses.
+ - id
+ - dag_id
+ - state
+ - job_type
+ - start_date
+ - end_date
+ - latest_heartbeat
+ - executor_class
+ - hostname
+ - unixname
+ title: JobResponse
+ description: Job serializer for responses.
PluginCollectionResponse:
properties:
plugins:
@@ -3019,64 +3144,186 @@ components:
- latest_scheduler_heartbeat
title: SchedulerInfoSchema
description: Schema for Scheduler info.
- TaskInstanceState:
+ TaskInstanceResponse:
properties:
- no_status:
- type: integer
- title: No Status
- removed:
- type: integer
- title: Removed
- scheduled:
- type: integer
- title: Scheduled
- queued:
- type: integer
- title: Queued
- running:
- type: integer
- title: Running
- success:
- type: integer
- title: Success
- restarting:
- type: integer
- title: Restarting
- failed:
- type: integer
- title: Failed
- up_for_retry:
- type: integer
- title: Up For Retry
- up_for_reschedule:
+ task_id:
+ type: string
+ title: Task Id
+ dag_id:
+ type: string
+ title: Dag Id
+ dag_run_id:
+ type: string
+ title: Dag Run Id
+ map_index:
type: integer
- title: Up For Reschedule
- upstream_failed:
+ title: Map Index
+ logical_date:
+ type: string
+ format: date-time
+ title: Logical Date
+ start_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Start Date
+ end_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: End Date
+ duration:
+ anyOf:
+ - type: number
+ - type: 'null'
+ title: Duration
+ state:
+ anyOf:
+ - $ref:
'#/components/schemas/airflow__utils__state__TaskInstanceState'
+ - type: 'null'
+ try_number:
type: integer
- title: Upstream Failed
- skipped:
+ title: Try Number
+ max_tries:
type: integer
- title: Skipped
- deferred:
+ title: Max Tries
+ task_display_name:
+ type: string
+ title: Task Display Name
+ hostname:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Hostname
+ unixname:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Unixname
+ pool:
+ type: string
+ title: Pool
+ pool_slots:
type: integer
- title: Deferred
+ title: Pool Slots
+ queue:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Queue
+ priority_weight:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Priority Weight
+ operator:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Operator
+ queued_when:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Queued When
+ pid:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Pid
+ executor:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Executor
+ executor_config:
+ type: string
+ title: Executor Config
+ note:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Note
+ rendered_map_index:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Rendered Map Index
+ rendered_fields:
+ type: object
+ title: Rendered Fields
+ default: {}
+ trigger:
+ anyOf:
+ - $ref: '#/components/schemas/TriggerResponse'
+ - type: 'null'
+ triggerer_job:
+ anyOf:
+ - $ref: '#/components/schemas/JobResponse'
+ - type: 'null'
type: object
required:
- - no_status
- - removed
- - scheduled
- - queued
- - running
- - success
- - restarting
- - failed
- - up_for_retry
- - up_for_reschedule
- - upstream_failed
- - skipped
- - deferred
- title: TaskInstanceState
+ - task_id
+ - dag_id
+ - dag_run_id
+ - map_index
+ - logical_date
+ - start_date
+ - end_date
+ - duration
+ - state
+ - try_number
+ - max_tries
+ - task_display_name
+ - hostname
+ - unixname
+ - pool
+ - pool_slots
+ - queue
+ - priority_weight
+ - operator
+ - queued_when
+ - pid
+ - executor
+ - executor_config
+ - note
+ - rendered_map_index
+ - trigger
+ - triggerer_job
+ title: TaskInstanceResponse
description: TaskInstance serializer for responses.
+ TriggerResponse:
+ properties:
+ id:
+ type: integer
+ title: Id
+ classpath:
+ type: string
+ title: Classpath
+ kwargs:
+ type: string
+ title: Kwargs
+ created_date:
+ type: string
+ format: date-time
+ title: Created Date
+ triggerer_id:
+ anyOf:
+ - type: integer
+ - type: 'null'
+ title: Triggerer Id
+ type: object
+ required:
+ - id
+ - classpath
+ - kwargs
+ - created_date
+ - triggerer_id
+ title: TriggerResponse
+ description: Trigger serializer for responses.
TriggererInfoSchema:
properties:
status:
@@ -3192,3 +3439,81 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
+ airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState:
+ properties:
+ no_status:
+ type: integer
+ title: No Status
+ removed:
+ type: integer
+ title: Removed
+ scheduled:
+ type: integer
+ title: Scheduled
+ queued:
+ type: integer
+ title: Queued
+ running:
+ type: integer
+ title: Running
+ success:
+ type: integer
+ title: Success
+ restarting:
+ type: integer
+ title: Restarting
+ failed:
+ type: integer
+ title: Failed
+ up_for_retry:
+ type: integer
+ title: Up For Retry
+ up_for_reschedule:
+ type: integer
+ title: Up For Reschedule
+ upstream_failed:
+ type: integer
+ title: Upstream Failed
+ skipped:
+ type: integer
+ title: Skipped
+ deferred:
+ type: integer
+ title: Deferred
+ type: object
+ required:
+ - no_status
+ - removed
+ - scheduled
+ - queued
+ - running
+ - success
+ - restarting
+ - failed
+ - up_for_retry
+ - up_for_reschedule
+ - upstream_failed
+ - skipped
+ - deferred
+ title: TaskInstanceState
+ description: TaskInstance serializer for responses.
+ airflow__utils__state__TaskInstanceState:
+ type: string
+ enum:
+ - removed
+ - scheduled
+ - queued
+ - running
+ - success
+ - restarting
+ - failed
+ - up_for_retry
+ - up_for_reschedule
+ - upstream_failed
+ - skipped
+ - deferred
+ title: TaskInstanceState
+ description: 'All possible states that a Task Instance can be in.
+
+
+ Note that None is also allowed, so always use this in a type hint with
Optional.'
diff --git a/airflow/api_fastapi/core_api/routes/public/__init__.py
b/airflow/api_fastapi/core_api/routes/public/__init__.py
index 1c2cc698fcd..cc9dd9c5e1b 100644
--- a/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -27,6 +27,7 @@ from airflow.api_fastapi.core_api.routes.public.monitor
import monitor_router
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
from airflow.api_fastapi.core_api.routes.public.providers import
providers_router
+from airflow.api_fastapi.core_api.routes.public.task_instances import
task_instances_router
from airflow.api_fastapi.core_api.routes.public.variables import
variables_router
from airflow.api_fastapi.core_api.routes.public.version import version_router
@@ -34,13 +35,15 @@ public_router = AirflowRouter(prefix="/public")
public_router.include_router(connections_router)
-public_router.include_router(dags_router)
public_router.include_router(dag_run_router)
public_router.include_router(dag_sources_router)
+public_router.include_router(dags_router)
+public_router.include_router(event_logs_router)
public_router.include_router(monitor_router)
+public_router.include_router(plugins_router)
public_router.include_router(pools_router)
public_router.include_router(providers_router)
-public_router.include_router(plugins_router)
-public_router.include_router(version_router)
-public_router.include_router(event_logs_router)
+public_router.include_router(task_instances_router)
public_router.include_router(variables_router)
+public_router.include_router(variables_router)
+public_router.include_router(version_router)
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
new file mode 100644
index 00000000000..7e0c6fa8941
--- /dev/null
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException
+from sqlalchemy.orm import Session, joinedload
+from sqlalchemy.sql import select
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.db.common import get_session
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.serializers.task_instances import
TaskInstanceResponse
+from airflow.models.taskinstance import TaskInstance as TI
+
+task_instances_router = AirflowRouter(
+ tags=["Task Instance"],
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
+)
+
+
+@task_instances_router.get("/{task_id}",
responses=create_openapi_http_exception_doc([401, 403, 404]))
+async def get_task_instance(
+ dag_id: str, dag_run_id: str, task_id: str, session: Annotated[Session,
Depends(get_session)]
+) -> TaskInstanceResponse:
+ """Get task instance."""
+ query = (
+ select(TI)
+ .where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id ==
task_id)
+ .join(TI.dag_run)
+ .options(joinedload(TI.rendered_task_instance_fields))
+ )
+
+ task_instance = session.scalar(query)
+
+ if task_instance is None:
+ raise HTTPException(
+ 404,
+ f"The Task Instance with dag_id: `{dag_id}`, run_id:
`{dag_run_id}` and task_id: `{task_id}` was not found",
+ )
+ if task_instance.map_index != -1:
+ raise HTTPException(404, "Task instance is mapped, add the map_index
value to the URL")
+
+ return TaskInstanceResponse.model_validate(task_instance,
from_attributes=True)
diff --git a/airflow/api_fastapi/core_api/serializers/job.py
b/airflow/api_fastapi/core_api/serializers/job.py
new file mode 100644
index 00000000000..e4d5ceb4b4e
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/job.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import BaseModel, ConfigDict
+
+
+class JobResponse(BaseModel):
+ """Job serializer for responses."""
+
+ model_config = ConfigDict(populate_by_name=True)
+
+ id: int
+ dag_id: str | None
+ state: str | None
+ job_type: str | None
+ start_date: datetime | None
+ end_date: datetime | None
+ latest_heartbeat: datetime | None
+ executor_class: datetime | None
+ hostname: str | None
+ unixname: str | None
diff --git a/airflow/api_fastapi/core_api/serializers/task_instances.py
b/airflow/api_fastapi/core_api/serializers/task_instances.py
new file mode 100644
index 00000000000..b8a10e8fb86
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/task_instances.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Annotated
+
+from pydantic import AliasPath, BaseModel, BeforeValidator, ConfigDict, Field
+
+from airflow.api_fastapi.core_api.serializers.job import JobResponse
+from airflow.api_fastapi.core_api.serializers.trigger import TriggerResponse
+from airflow.utils.state import TaskInstanceState
+
+
+class TaskInstanceResponse(BaseModel):
+ """TaskInstance serializer for responses."""
+
+ model_config = ConfigDict(populate_by_name=True)
+
+ task_id: str
+ dag_id: str
+ run_id: str = Field(alias="dag_run_id")
+ map_index: int
+ execution_date: datetime = Field(alias="logical_date")
+ start_date: datetime | None
+ end_date: datetime | None
+ duration: float | None
+ state: TaskInstanceState | None
+ try_number: int
+ max_tries: int
+ task_display_name: str
+ hostname: str | None
+ unixname: str | None
+ pool: str
+ pool_slots: int
+ queue: str | None
+ priority_weight: int | None
+ operator: str | None
+ queued_dttm: datetime | None = Field(alias="queued_when")
+ pid: int | None
+ executor: str | None
+ executor_config: Annotated[str, BeforeValidator(str)]
+ note: str | None
+ rendered_map_index: str | None
+ rendered_fields: dict = Field(
+ validation_alias=AliasPath("rendered_task_instance_fields",
"rendered_fields"),
+ default={},
+ )
+ trigger: TriggerResponse | None
+ queued_by_job: JobResponse | None = Field(alias="triggerer_job")
+
+
+class TaskInstanceCollectionResponse(BaseModel):
+ """Task Instance Collection serializer for responses."""
+
+ task_instances: list[TaskInstanceResponse]
+ total_entries: int
diff --git a/airflow/api_fastapi/core_api/serializers/trigger.py
b/airflow/api_fastapi/core_api/serializers/trigger.py
new file mode 100644
index 00000000000..624fa49ab03
--- /dev/null
+++ b/airflow/api_fastapi/core_api/serializers/trigger.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import BaseModel, BeforeValidator, ConfigDict
+from typing_extensions import Annotated
+
+
+class TriggerResponse(BaseModel):
+ """Trigger serializer for responses."""
+
+ model_config = ConfigDict(populate_by_name=True)
+
+ id: int
+ classpath: str
+ kwargs: Annotated[str, BeforeValidator(str)]
+ created_date: datetime
+ triggerer_id: int | None
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index b05070962f0..875e2b87f39 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -14,6 +14,7 @@ import {
PluginService,
PoolService,
ProviderService,
+ TaskInstanceService,
VariableService,
VersionService,
} from "../requests/services.gen";
@@ -151,6 +152,46 @@ export const UseConnectionServiceGetConnectionsKeyFn = (
useConnectionServiceGetConnectionsKey,
...(queryKey ?? [{ limit, offset, orderBy }]),
];
+export type DagRunServiceGetDagRunDefaultResponse = Awaited<
+ ReturnType<typeof DagRunService.getDagRun>
+>;
+export type DagRunServiceGetDagRunQueryResult<
+ TData = DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun";
+export const UseDagRunServiceGetDagRunKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: Array<unknown>,
+) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
+export type DagSourceServiceGetDagSourceDefaultResponse = Awaited<
+ ReturnType<typeof DagSourceService.getDagSource>
+>;
+export type DagSourceServiceGetDagSourceQueryResult<
+ TData = DagSourceServiceGetDagSourceDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useDagSourceServiceGetDagSourceKey =
+ "DagSourceServiceGetDagSource";
+export const UseDagSourceServiceGetDagSourceKeyFn = (
+ {
+ accept,
+ fileToken,
+ }: {
+ accept?: string;
+ fileToken: string;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useDagSourceServiceGetDagSourceKey,
+ ...(queryKey ?? [{ accept, fileToken }]),
+];
export type DagServiceGetDagsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDags>
>;
@@ -258,46 +299,22 @@ export const UseDagServiceGetDagDetailsKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagServiceGetDagDetailsKey, ...(queryKey ?? [{ dagId }])];
-export type DagRunServiceGetDagRunDefaultResponse = Awaited<
- ReturnType<typeof DagRunService.getDagRun>
->;
-export type DagRunServiceGetDagRunQueryResult<
- TData = DagRunServiceGetDagRunDefaultResponse,
- TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const useDagRunServiceGetDagRunKey = "DagRunServiceGetDagRun";
-export const UseDagRunServiceGetDagRunKeyFn = (
- {
- dagId,
- dagRunId,
- }: {
- dagId: string;
- dagRunId: string;
- },
- queryKey?: Array<unknown>,
-) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
-export type DagSourceServiceGetDagSourceDefaultResponse = Awaited<
- ReturnType<typeof DagSourceService.getDagSource>
+export type EventLogServiceGetEventLogDefaultResponse = Awaited<
+ ReturnType<typeof EventLogService.getEventLog>
>;
-export type DagSourceServiceGetDagSourceQueryResult<
- TData = DagSourceServiceGetDagSourceDefaultResponse,
+export type EventLogServiceGetEventLogQueryResult<
+ TData = EventLogServiceGetEventLogDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
-export const useDagSourceServiceGetDagSourceKey =
- "DagSourceServiceGetDagSource";
-export const UseDagSourceServiceGetDagSourceKeyFn = (
+export const useEventLogServiceGetEventLogKey = "EventLogServiceGetEventLog";
+export const UseEventLogServiceGetEventLogKeyFn = (
{
- accept,
- fileToken,
+ eventLogId,
}: {
- accept?: string;
- fileToken: string;
+ eventLogId: number;
},
queryKey?: Array<unknown>,
-) => [
- useDagSourceServiceGetDagSourceKey,
- ...(queryKey ?? [{ accept, fileToken }]),
-];
+) => [useEventLogServiceGetEventLogKey, ...(queryKey ?? [{ eventLogId }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
@@ -310,6 +327,24 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?:
Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
+export type PluginServiceGetPluginsDefaultResponse = Awaited<
+ ReturnType<typeof PluginService.getPlugins>
+>;
+export type PluginServiceGetPluginsQueryResult<
+ TData = PluginServiceGetPluginsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const usePluginServiceGetPluginsKey = "PluginServiceGetPlugins";
+export const UsePluginServiceGetPluginsKeyFn = (
+ {
+ limit,
+ offset,
+ }: {
+ limit?: number;
+ offset?: number;
+ } = {},
+ queryKey?: Array<unknown>,
+) => [usePluginServiceGetPluginsKey, ...(queryKey ?? [{ limit, offset }])];
export type PoolServiceGetPoolDefaultResponse = Awaited<
ReturnType<typeof PoolService.getPool>
>;
@@ -364,52 +399,30 @@ export const UseProviderServiceGetProvidersKeyFn = (
} = {},
queryKey?: Array<unknown>,
) => [useProviderServiceGetProvidersKey, ...(queryKey ?? [{ limit, offset }])];
-export type PluginServiceGetPluginsDefaultResponse = Awaited<
- ReturnType<typeof PluginService.getPlugins>
->;
-export type PluginServiceGetPluginsQueryResult<
- TData = PluginServiceGetPluginsDefaultResponse,
- TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const usePluginServiceGetPluginsKey = "PluginServiceGetPlugins";
-export const UsePluginServiceGetPluginsKeyFn = (
- {
- limit,
- offset,
- }: {
- limit?: number;
- offset?: number;
- } = {},
- queryKey?: Array<unknown>,
-) => [usePluginServiceGetPluginsKey, ...(queryKey ?? [{ limit, offset }])];
-export type VersionServiceGetVersionDefaultResponse = Awaited<
- ReturnType<typeof VersionService.getVersion>
->;
-export type VersionServiceGetVersionQueryResult<
- TData = VersionServiceGetVersionDefaultResponse,
- TError = unknown,
-> = UseQueryResult<TData, TError>;
-export const useVersionServiceGetVersionKey = "VersionServiceGetVersion";
-export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) =>
[
- useVersionServiceGetVersionKey,
- ...(queryKey ?? []),
-];
-export type EventLogServiceGetEventLogDefaultResponse = Awaited<
- ReturnType<typeof EventLogService.getEventLog>
+export type TaskInstanceServiceGetTaskInstanceDefaultResponse = Awaited<
+ ReturnType<typeof TaskInstanceService.getTaskInstance>
>;
-export type EventLogServiceGetEventLogQueryResult<
- TData = EventLogServiceGetEventLogDefaultResponse,
+export type TaskInstanceServiceGetTaskInstanceQueryResult<
+ TData = TaskInstanceServiceGetTaskInstanceDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
-export const useEventLogServiceGetEventLogKey = "EventLogServiceGetEventLog";
-export const UseEventLogServiceGetEventLogKeyFn = (
+export const useTaskInstanceServiceGetTaskInstanceKey =
+ "TaskInstanceServiceGetTaskInstance";
+export const UseTaskInstanceServiceGetTaskInstanceKeyFn = (
{
- eventLogId,
+ dagId,
+ dagRunId,
+ taskId,
}: {
- eventLogId: number;
+ dagId: string;
+ dagRunId: string;
+ taskId: string;
},
queryKey?: Array<unknown>,
-) => [useEventLogServiceGetEventLogKey, ...(queryKey ?? [{ eventLogId }])];
+) => [
+ useTaskInstanceServiceGetTaskInstanceKey,
+ ...(queryKey ?? [{ dagId, dagRunId, taskId }]),
+];
export type VariableServiceGetVariableDefaultResponse = Awaited<
ReturnType<typeof VariableService.getVariable>
>;
@@ -449,21 +462,33 @@ export const UseVariableServiceGetVariablesKeyFn = (
useVariableServiceGetVariablesKey,
...(queryKey ?? [{ limit, offset, orderBy }]),
];
+export type VersionServiceGetVersionDefaultResponse = Awaited<
+ ReturnType<typeof VersionService.getVersion>
+>;
+export type VersionServiceGetVersionQueryResult<
+ TData = VersionServiceGetVersionDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useVersionServiceGetVersionKey = "VersionServiceGetVersion";
+export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) =>
[
+ useVersionServiceGetVersionKey,
+ ...(queryKey ?? []),
+];
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
+export type DagRunServicePatchDagRunStateMutationResult = Awaited<
+ ReturnType<typeof DagRunService.patchDagRunState>
+>;
export type DagServicePatchDagsMutationResult = Awaited<
ReturnType<typeof DagService.patchDags>
>;
export type DagServicePatchDagMutationResult = Awaited<
ReturnType<typeof DagService.patchDag>
>;
-export type DagRunServicePatchDagRunStateMutationResult = Awaited<
- ReturnType<typeof DagRunService.patchDagRunState>
->;
export type PoolServicePatchPoolMutationResult = Awaited<
ReturnType<typeof PoolService.patchPool>
>;
@@ -473,12 +498,12 @@ export type VariableServicePatchVariableMutationResult =
Awaited<
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
-export type DagServiceDeleteDagMutationResult = Awaited<
- ReturnType<typeof DagService.deleteDag>
->;
export type DagRunServiceDeleteDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.deleteDagRun>
>;
+export type DagServiceDeleteDagMutationResult = Awaited<
+ ReturnType<typeof DagService.deleteDag>
+>;
export type PoolServiceDeletePoolMutationResult = Awaited<
ReturnType<typeof PoolService.deletePool>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index f1924acdbac..795c8770b85 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -14,6 +14,7 @@ import {
PluginService,
PoolService,
ProviderService,
+ TaskInstanceService,
VariableService,
VersionService,
} from "../requests/services.gen";
@@ -185,6 +186,54 @@ export const prefetchUseConnectionServiceGetConnections = (
}),
queryFn: () => ConnectionService.getConnections({ limit, offset, orderBy
}),
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagRunServiceGetDagRun = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
+ });
+/**
+ * Get Dag Source
+ * Get source code using file token.
+ * @param data The data for the request.
+ * @param data.fileToken
+ * @param data.accept
+ * @returns DAGSourceResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseDagSourceServiceGetDagSource = (
+ queryClient: QueryClient,
+ {
+ accept,
+ fileToken,
+ }: {
+ accept?: string;
+ fileToken: string;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn({
+ accept,
+ fileToken,
+ }),
+ queryFn: () => DagSourceService.getDagSource({ accept, fileToken }),
+ });
/**
* Get Dags
* Get all DAGs.
@@ -331,52 +380,23 @@ export const prefetchUseDagServiceGetDagDetails = (
queryFn: () => DagService.getDagDetails({ dagId }),
});
/**
- * Get Dag Run
- * @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @returns DAGRunResponse Successful Response
- * @throws ApiError
- */
-export const prefetchUseDagRunServiceGetDagRun = (
- queryClient: QueryClient,
- {
- dagId,
- dagRunId,
- }: {
- dagId: string;
- dagRunId: string;
- },
-) =>
- queryClient.prefetchQuery({
- queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
- queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
- });
-/**
- * Get Dag Source
- * Get source code using file token.
+ * Get Event Log
* @param data The data for the request.
- * @param data.fileToken
- * @param data.accept
- * @returns DAGSourceResponse Successful Response
+ * @param data.eventLogId
+ * @returns EventLogResponse Successful Response
* @throws ApiError
*/
-export const prefetchUseDagSourceServiceGetDagSource = (
+export const prefetchUseEventLogServiceGetEventLog = (
queryClient: QueryClient,
{
- accept,
- fileToken,
+ eventLogId,
}: {
- accept?: string;
- fileToken: string;
+ eventLogId: number;
},
) =>
queryClient.prefetchQuery({
- queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn({
- accept,
- fileToken,
- }),
- queryFn: () => DagSourceService.getDagSource({ accept, fileToken }),
+ queryKey: Common.UseEventLogServiceGetEventLogKeyFn({ eventLogId }),
+ queryFn: () => EventLogService.getEventLog({ eventLogId }),
});
/**
* Get Health
@@ -388,6 +408,28 @@ export const prefetchUseMonitorServiceGetHealth =
(queryClient: QueryClient) =>
queryKey: Common.UseMonitorServiceGetHealthKeyFn(),
queryFn: () => MonitorService.getHealth(),
});
+/**
+ * Get Plugins
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @returns PluginCollectionResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUsePluginServiceGetPlugins = (
+ queryClient: QueryClient,
+ {
+ limit,
+ offset,
+ }: {
+ limit?: number;
+ offset?: number;
+ } = {},
+) =>
+ queryClient.prefetchQuery({
+ queryKey: Common.UsePluginServiceGetPluginsKeyFn({ limit, offset }),
+ queryFn: () => PluginService.getPlugins({ limit, offset }),
+ });
/**
* Get Pool
* Get a pool.
@@ -458,56 +500,35 @@ export const prefetchUseProviderServiceGetProviders = (
queryFn: () => ProviderService.getProviders({ limit, offset }),
});
/**
- * Get Plugins
+ * Get Task Instance
+ * Get task instance.
* @param data The data for the request.
- * @param data.limit
- * @param data.offset
- * @returns PluginCollectionResponse Successful Response
- * @throws ApiError
- */
-export const prefetchUsePluginServiceGetPlugins = (
- queryClient: QueryClient,
- {
- limit,
- offset,
- }: {
- limit?: number;
- offset?: number;
- } = {},
-) =>
- queryClient.prefetchQuery({
- queryKey: Common.UsePluginServiceGetPluginsKeyFn({ limit, offset }),
- queryFn: () => PluginService.getPlugins({ limit, offset }),
- });
-/**
- * Get Version
- * Get version information.
- * @returns VersionInfo Successful Response
- * @throws ApiError
- */
-export const prefetchUseVersionServiceGetVersion = (queryClient: QueryClient)
=>
- queryClient.prefetchQuery({
- queryKey: Common.UseVersionServiceGetVersionKeyFn(),
- queryFn: () => VersionService.getVersion(),
- });
-/**
- * Get Event Log
- * @param data The data for the request.
- * @param data.eventLogId
- * @returns EventLogResponse Successful Response
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
-export const prefetchUseEventLogServiceGetEventLog = (
+export const prefetchUseTaskInstanceServiceGetTaskInstance = (
queryClient: QueryClient,
{
- eventLogId,
+ dagId,
+ dagRunId,
+ taskId,
}: {
- eventLogId: number;
+ dagId: string;
+ dagRunId: string;
+ taskId: string;
},
) =>
queryClient.prefetchQuery({
- queryKey: Common.UseEventLogServiceGetEventLogKeyFn({ eventLogId }),
- queryFn: () => EventLogService.getEventLog({ eventLogId }),
+ queryKey: Common.UseTaskInstanceServiceGetTaskInstanceKeyFn({
+ dagId,
+ dagRunId,
+ taskId,
+ }),
+ queryFn: () =>
+ TaskInstanceService.getTaskInstance({ dagId, dagRunId, taskId }),
});
/**
* Get Variable
@@ -559,3 +580,14 @@ export const prefetchUseVariableServiceGetVariables = (
}),
queryFn: () => VariableService.getVariables({ limit, offset, orderBy }),
});
+/**
+ * Get Version
+ * Get version information.
+ * @returns VersionInfo Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseVersionServiceGetVersion = (queryClient: QueryClient)
=>
+ queryClient.prefetchQuery({
+ queryKey: Common.UseVersionServiceGetVersionKeyFn(),
+ queryFn: () => VersionService.getVersion(),
+ });
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 3b438c9e996..afb3fddaefe 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -19,6 +19,7 @@ import {
PluginService,
PoolService,
ProviderService,
+ TaskInstanceService,
VariableService,
VersionService,
} from "../requests/services.gen";
@@ -234,6 +235,70 @@ export const useConnectionServiceGetConnections = <
ConnectionService.getConnections({ limit, offset, orderBy }) as TData,
...options,
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRun = <
+ TData = Common.DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ ...options,
+ });
+/**
+ * Get Dag Source
+ * Get source code using file token.
+ * @param data The data for the request.
+ * @param data.fileToken
+ * @param data.accept
+ * @returns DAGSourceResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagSourceServiceGetDagSource = <
+ TData = Common.DagSourceServiceGetDagSourceDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ accept,
+ fileToken,
+ }: {
+ accept?: string;
+ fileToken: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
+ { accept, fileToken },
+ queryKey,
+ ),
+ queryFn: () =>
+ DagSourceService.getDagSource({ accept, fileToken }) as TData,
+ ...options,
+ });
/**
* Get Dags
* Get all DAGs.
@@ -410,85 +475,80 @@ export const useDagServiceGetDagDetails = <
...options,
});
/**
- * Get Dag Run
+ * Get Event Log
* @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @returns DAGRunResponse Successful Response
+ * @param data.eventLogId
+ * @returns EventLogResponse Successful Response
* @throws ApiError
*/
-export const useDagRunServiceGetDagRun = <
- TData = Common.DagRunServiceGetDagRunDefaultResponse,
+export const useEventLogServiceGetEventLog = <
+ TData = Common.EventLogServiceGetEventLogDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
- dagId,
- dagRunId,
+ eventLogId,
}: {
- dagId: string;
- dagRunId: string;
+ eventLogId: number;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
- queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
- { dagId, dagRunId },
+ queryKey: Common.UseEventLogServiceGetEventLogKeyFn(
+ { eventLogId },
queryKey,
),
- queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData,
...options,
});
/**
- * Get Dag Source
- * Get source code using file token.
- * @param data The data for the request.
- * @param data.fileToken
- * @param data.accept
- * @returns DAGSourceResponse Successful Response
+ * Get Health
+ * @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
-export const useDagSourceServiceGetDagSource = <
- TData = Common.DagSourceServiceGetDagSourceDefaultResponse,
+export const useMonitorServiceGetHealth = <
+ TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
- {
- accept,
- fileToken,
- }: {
- accept?: string;
- fileToken: string;
- },
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
- queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
- { accept, fileToken },
- queryKey,
- ),
- queryFn: () =>
- DagSourceService.getDagSource({ accept, fileToken }) as TData,
+ queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
+ queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
- * Get Health
- * @returns HealthInfoSchema Successful Response
+ * Get Plugins
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @returns PluginCollectionResponse Successful Response
* @throws ApiError
*/
-export const useMonitorServiceGetHealth = <
- TData = Common.MonitorServiceGetHealthDefaultResponse,
+export const usePluginServiceGetPlugins = <
+ TData = Common.PluginServiceGetPluginsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
+ {
+ limit,
+ offset,
+ }: {
+ limit?: number;
+ offset?: number;
+ } = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
- queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
- queryFn: () => MonitorService.getHealth() as TData,
+ queryKey: Common.UsePluginServiceGetPluginsKeyFn(
+ { limit, offset },
+ queryKey,
+ ),
+ queryFn: () => PluginService.getPlugins({ limit, offset }) as TData,
...options,
});
/**
@@ -585,81 +645,39 @@ export const useProviderServiceGetProviders = <
...options,
});
/**
- * Get Plugins
+ * Get Task Instance
+ * Get task instance.
* @param data The data for the request.
- * @param data.limit
- * @param data.offset
- * @returns PluginCollectionResponse Successful Response
- * @throws ApiError
- */
-export const usePluginServiceGetPlugins = <
- TData = Common.PluginServiceGetPluginsDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- limit,
- offset,
- }: {
- limit?: number;
- offset?: number;
- } = {},
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useQuery<TData, TError>({
- queryKey: Common.UsePluginServiceGetPluginsKeyFn(
- { limit, offset },
- queryKey,
- ),
- queryFn: () => PluginService.getPlugins({ limit, offset }) as TData,
- ...options,
- });
-/**
- * Get Version
- * Get version information.
- * @returns VersionInfo Successful Response
- * @throws ApiError
- */
-export const useVersionServiceGetVersion = <
- TData = Common.VersionServiceGetVersionDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useQuery<TData, TError>({
- queryKey: Common.UseVersionServiceGetVersionKeyFn(queryKey),
- queryFn: () => VersionService.getVersion() as TData,
- ...options,
- });
-/**
- * Get Event Log
- * @param data The data for the request.
- * @param data.eventLogId
- * @returns EventLogResponse Successful Response
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
-export const useEventLogServiceGetEventLog = <
- TData = Common.EventLogServiceGetEventLogDefaultResponse,
+export const useTaskInstanceServiceGetTaskInstance = <
+ TData = Common.TaskInstanceServiceGetTaskInstanceDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
- eventLogId,
+ dagId,
+ dagRunId,
+ taskId,
}: {
- eventLogId: number;
+ dagId: string;
+ dagRunId: string;
+ taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
- queryKey: Common.UseEventLogServiceGetEventLogKeyFn(
- { eventLogId },
+ queryKey: Common.UseTaskInstanceServiceGetTaskInstanceKeyFn(
+ { dagId, dagRunId, taskId },
queryKey,
),
- queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData,
+ queryFn: () =>
+ TaskInstanceService.getTaskInstance({ dagId, dagRunId, taskId }) as
TData,
...options,
});
/**
@@ -727,6 +745,25 @@ export const useVariableServiceGetVariables = <
VariableService.getVariables({ limit, offset, orderBy }) as TData,
...options,
});
+/**
+ * Get Version
+ * Get version information.
+ * @returns VersionInfo Successful Response
+ * @throws ApiError
+ */
+export const useVersionServiceGetVersion = <
+ TData = Common.VersionServiceGetVersionDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey: Common.UseVersionServiceGetVersionKeyFn(queryKey),
+ queryFn: () => VersionService.getVersion() as TData,
+ ...options,
+ });
/**
* Post Pool
* Create a Pool.
@@ -803,6 +840,57 @@ export const useVariableServicePostVariable = <
}) as unknown as Promise<TData>,
...options,
});
+/**
+ * Patch Dag Run State
+ * Modify a DAG Run.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @param data.updateMask
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServicePatchDagRunState = <
+ TData = Common.DagRunServicePatchDagRunStateMutationResult,
+ TError = unknown,
+ TContext = unknown,
+>(
+ options?: Omit<
+ UseMutationOptions<
+ TData,
+ TError,
+ {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunPatchBody;
+ updateMask?: string[];
+ },
+ TContext
+ >,
+ "mutationFn"
+ >,
+) =>
+ useMutation<
+ TData,
+ TError,
+ {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunPatchBody;
+ updateMask?: string[];
+ },
+ TContext
+ >({
+ mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
+ DagRunService.patchDagRunState({
+ dagId,
+ dagRunId,
+ requestBody,
+ updateMask,
+ }) as unknown as Promise<TData>,
+ ...options,
+ });
/**
* Patch Dags
* Patch multiple DAGs.
@@ -936,57 +1024,6 @@ export const useDagServicePatchDag = <
}) as unknown as Promise<TData>,
...options,
});
-/**
- * Patch Dag Run State
- * Modify a DAG Run.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @param data.requestBody
- * @param data.updateMask
- * @returns DAGRunResponse Successful Response
- * @throws ApiError
- */
-export const useDagRunServicePatchDagRunState = <
- TData = Common.DagRunServicePatchDagRunStateMutationResult,
- TError = unknown,
- TContext = unknown,
->(
- options?: Omit<
- UseMutationOptions<
- TData,
- TError,
- {
- dagId: string;
- dagRunId: string;
- requestBody: DAGRunPatchBody;
- updateMask?: string[];
- },
- TContext
- >,
- "mutationFn"
- >,
-) =>
- useMutation<
- TData,
- TError,
- {
- dagId: string;
- dagRunId: string;
- requestBody: DAGRunPatchBody;
- updateMask?: string[];
- },
- TContext
- >({
- mutationFn: ({ dagId, dagRunId, requestBody, updateMask }) =>
- DagRunService.patchDagRunState({
- dagId,
- dagRunId,
- requestBody,
- updateMask,
- }) as unknown as Promise<TData>,
- ...options,
- });
/**
* Patch Pool
* Update a Pool.
@@ -1121,15 +1158,16 @@ export const useConnectionServiceDeleteConnection = <
...options,
});
/**
- * Delete Dag
- * Delete the specific DAG.
+ * Delete Dag Run
+ * Delete a DAG Run entry.
* @param data The data for the request.
* @param data.dagId
- * @returns unknown Successful Response
+ * @param data.dagRunId
+ * @returns void Successful Response
* @throws ApiError
*/
-export const useDagServiceDeleteDag = <
- TData = Common.DagServiceDeleteDagMutationResult,
+export const useDagRunServiceDeleteDagRun = <
+ TData = Common.DagRunServiceDeleteDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
@@ -1139,6 +1177,7 @@ export const useDagServiceDeleteDag = <
TError,
{
dagId: string;
+ dagRunId: string;
},
TContext
>,
@@ -1150,24 +1189,27 @@ export const useDagServiceDeleteDag = <
TError,
{
dagId: string;
+ dagRunId: string;
},
TContext
>({
- mutationFn: ({ dagId }) =>
- DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
+ mutationFn: ({ dagId, dagRunId }) =>
+ DagRunService.deleteDagRun({
+ dagId,
+ dagRunId,
+ }) as unknown as Promise<TData>,
...options,
});
/**
- * Delete Dag Run
- * Delete a DAG Run entry.
+ * Delete Dag
+ * Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
- * @param data.dagRunId
- * @returns void Successful Response
+ * @returns unknown Successful Response
* @throws ApiError
*/
-export const useDagRunServiceDeleteDagRun = <
- TData = Common.DagRunServiceDeleteDagRunMutationResult,
+export const useDagServiceDeleteDag = <
+ TData = Common.DagServiceDeleteDagMutationResult,
TError = unknown,
TContext = unknown,
>(
@@ -1177,7 +1219,6 @@ export const useDagRunServiceDeleteDagRun = <
TError,
{
dagId: string;
- dagRunId: string;
},
TContext
>,
@@ -1189,15 +1230,11 @@ export const useDagRunServiceDeleteDagRun = <
TError,
{
dagId: string;
- dagRunId: string;
},
TContext
>({
- mutationFn: ({ dagId, dagRunId }) =>
- DagRunService.deleteDagRun({
- dagId,
- dagRunId,
- }) as unknown as Promise<TData>,
+ mutationFn: ({ dagId }) =>
+ DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
...options,
});
/**
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 9af6582f88d..ab8dfbabcc0 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -14,6 +14,7 @@ import {
PluginService,
PoolService,
ProviderService,
+ TaskInstanceService,
VariableService,
VersionService,
} from "../requests/services.gen";
@@ -222,6 +223,70 @@ export const useConnectionServiceGetConnectionsSuspense = <
ConnectionService.getConnections({ limit, offset, orderBy }) as TData,
...options,
});
+/**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagRunServiceGetDagRunSuspense = <
+ TData = Common.DagRunServiceGetDagRunDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
+ { dagId, dagRunId },
+ queryKey,
+ ),
+ queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ ...options,
+ });
+/**
+ * Get Dag Source
+ * Get source code using file token.
+ * @param data The data for the request.
+ * @param data.fileToken
+ * @param data.accept
+ * @returns DAGSourceResponse Successful Response
+ * @throws ApiError
+ */
+export const useDagSourceServiceGetDagSourceSuspense = <
+ TData = Common.DagSourceServiceGetDagSourceDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ accept,
+ fileToken,
+ }: {
+ accept?: string;
+ fileToken: string;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
+ { accept, fileToken },
+ queryKey,
+ ),
+ queryFn: () =>
+ DagSourceService.getDagSource({ accept, fileToken }) as TData,
+ ...options,
+ });
/**
* Get Dags
* Get all DAGs.
@@ -398,85 +463,80 @@ export const useDagServiceGetDagDetailsSuspense = <
...options,
});
/**
- * Get Dag Run
+ * Get Event Log
* @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @returns DAGRunResponse Successful Response
+ * @param data.eventLogId
+ * @returns EventLogResponse Successful Response
* @throws ApiError
*/
-export const useDagRunServiceGetDagRunSuspense = <
- TData = Common.DagRunServiceGetDagRunDefaultResponse,
+export const useEventLogServiceGetEventLogSuspense = <
+ TData = Common.EventLogServiceGetEventLogDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
- dagId,
- dagRunId,
+ eventLogId,
}: {
- dagId: string;
- dagRunId: string;
+ eventLogId: number;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
- queryKey: Common.UseDagRunServiceGetDagRunKeyFn(
- { dagId, dagRunId },
+ queryKey: Common.UseEventLogServiceGetEventLogKeyFn(
+ { eventLogId },
queryKey,
),
- queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
+ queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData,
...options,
});
/**
- * Get Dag Source
- * Get source code using file token.
- * @param data The data for the request.
- * @param data.fileToken
- * @param data.accept
- * @returns DAGSourceResponse Successful Response
+ * Get Health
+ * @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
-export const useDagSourceServiceGetDagSourceSuspense = <
- TData = Common.DagSourceServiceGetDagSourceDefaultResponse,
+export const useMonitorServiceGetHealthSuspense = <
+ TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
- {
- accept,
- fileToken,
- }: {
- accept?: string;
- fileToken: string;
- },
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
- queryKey: Common.UseDagSourceServiceGetDagSourceKeyFn(
- { accept, fileToken },
- queryKey,
- ),
- queryFn: () =>
- DagSourceService.getDagSource({ accept, fileToken }) as TData,
+ queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
+ queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
- * Get Health
- * @returns HealthInfoSchema Successful Response
+ * Get Plugins
+ * @param data The data for the request.
+ * @param data.limit
+ * @param data.offset
+ * @returns PluginCollectionResponse Successful Response
* @throws ApiError
*/
-export const useMonitorServiceGetHealthSuspense = <
- TData = Common.MonitorServiceGetHealthDefaultResponse,
+export const usePluginServiceGetPluginsSuspense = <
+ TData = Common.PluginServiceGetPluginsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
+ {
+ limit,
+ offset,
+ }: {
+ limit?: number;
+ offset?: number;
+ } = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
- queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
- queryFn: () => MonitorService.getHealth() as TData,
+ queryKey: Common.UsePluginServiceGetPluginsKeyFn(
+ { limit, offset },
+ queryKey,
+ ),
+ queryFn: () => PluginService.getPlugins({ limit, offset }) as TData,
...options,
});
/**
@@ -573,81 +633,39 @@ export const useProviderServiceGetProvidersSuspense = <
...options,
});
/**
- * Get Plugins
- * @param data The data for the request.
- * @param data.limit
- * @param data.offset
- * @returns PluginCollectionResponse Successful Response
- * @throws ApiError
- */
-export const usePluginServiceGetPluginsSuspense = <
- TData = Common.PluginServiceGetPluginsDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- {
- limit,
- offset,
- }: {
- limit?: number;
- offset?: number;
- } = {},
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useSuspenseQuery<TData, TError>({
- queryKey: Common.UsePluginServiceGetPluginsKeyFn(
- { limit, offset },
- queryKey,
- ),
- queryFn: () => PluginService.getPlugins({ limit, offset }) as TData,
- ...options,
- });
-/**
- * Get Version
- * Get version information.
- * @returns VersionInfo Successful Response
- * @throws ApiError
- */
-export const useVersionServiceGetVersionSuspense = <
- TData = Common.VersionServiceGetVersionDefaultResponse,
- TError = unknown,
- TQueryKey extends Array<unknown> = unknown[],
->(
- queryKey?: TQueryKey,
- options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
-) =>
- useSuspenseQuery<TData, TError>({
- queryKey: Common.UseVersionServiceGetVersionKeyFn(queryKey),
- queryFn: () => VersionService.getVersion() as TData,
- ...options,
- });
-/**
- * Get Event Log
+ * Get Task Instance
+ * Get task instance.
* @param data The data for the request.
- * @param data.eventLogId
- * @returns EventLogResponse Successful Response
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
-export const useEventLogServiceGetEventLogSuspense = <
- TData = Common.EventLogServiceGetEventLogDefaultResponse,
+export const useTaskInstanceServiceGetTaskInstanceSuspense = <
+ TData = Common.TaskInstanceServiceGetTaskInstanceDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
- eventLogId,
+ dagId,
+ dagRunId,
+ taskId,
}: {
- eventLogId: number;
+ dagId: string;
+ dagRunId: string;
+ taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
- queryKey: Common.UseEventLogServiceGetEventLogKeyFn(
- { eventLogId },
+ queryKey: Common.UseTaskInstanceServiceGetTaskInstanceKeyFn(
+ { dagId, dagRunId, taskId },
queryKey,
),
- queryFn: () => EventLogService.getEventLog({ eventLogId }) as TData,
+ queryFn: () =>
+ TaskInstanceService.getTaskInstance({ dagId, dagRunId, taskId }) as
TData,
...options,
});
/**
@@ -715,3 +733,22 @@ export const useVariableServiceGetVariablesSuspense = <
VariableService.getVariables({ limit, offset, orderBy }) as TData,
...options,
});
+/**
+ * Get Version
+ * Get version information.
+ * @returns VersionInfo Successful Response
+ * @throws ApiError
+ */
+export const useVersionServiceGetVersionSuspense = <
+ TData = Common.VersionServiceGetVersionDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey: Common.UseVersionServiceGetVersionKeyFn(queryKey),
+ queryFn: () => VersionService.getVersion() as TData,
+ ...options,
+ });
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f6b1efeb1f1..712cc8cae98 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1692,7 +1692,7 @@ export const $HistoricalMetricDataResponse = {
$ref: "#/components/schemas/DAGRunStates",
},
task_instance_states: {
- $ref: "#/components/schemas/TaskInstanceState",
+ $ref:
"#/components/schemas/airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState",
},
},
type: "object",
@@ -1701,6 +1701,133 @@ export const $HistoricalMetricDataResponse = {
description: "Historical Metric Data serializer for responses.",
} as const;
+export const $JobResponse = {
+ properties: {
+ id: {
+ type: "integer",
+ title: "Id",
+ },
+ dag_id: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Dag Id",
+ },
+ state: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "State",
+ },
+ job_type: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Job Type",
+ },
+ start_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Start Date",
+ },
+ end_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "End Date",
+ },
+ latest_heartbeat: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Latest Heartbeat",
+ },
+ executor_class: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Executor Class",
+ },
+ hostname: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Hostname",
+ },
+ unixname: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Unixname",
+ },
+ },
+ type: "object",
+ required: [
+ "id",
+ "dag_id",
+ "state",
+ "job_type",
+ "start_date",
+ "end_date",
+ "latest_heartbeat",
+ "executor_class",
+ "hostname",
+ "unixname",
+ ],
+ title: "JobResponse",
+ description: "Job serializer for responses.",
+} as const;
+
export const $PluginCollectionResponse = {
properties: {
plugins: {
@@ -2065,81 +2192,306 @@ export const $SchedulerInfoSchema = {
description: "Schema for Scheduler info.",
} as const;
-export const $TaskInstanceState = {
+export const $TaskInstanceResponse = {
properties: {
- no_status: {
- type: "integer",
- title: "No Status",
+ task_id: {
+ type: "string",
+ title: "Task Id",
},
- removed: {
- type: "integer",
- title: "Removed",
+ dag_id: {
+ type: "string",
+ title: "Dag Id",
},
- scheduled: {
- type: "integer",
- title: "Scheduled",
+ dag_run_id: {
+ type: "string",
+ title: "Dag Run Id",
},
- queued: {
+ map_index: {
type: "integer",
- title: "Queued",
+ title: "Map Index",
},
- running: {
- type: "integer",
- title: "Running",
+ logical_date: {
+ type: "string",
+ format: "date-time",
+ title: "Logical Date",
},
- success: {
- type: "integer",
- title: "Success",
+ start_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Start Date",
},
- restarting: {
- type: "integer",
- title: "Restarting",
+ end_date: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "End Date",
},
- failed: {
- type: "integer",
- title: "Failed",
+ duration: {
+ anyOf: [
+ {
+ type: "number",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Duration",
},
- up_for_retry: {
- type: "integer",
- title: "Up For Retry",
+ state: {
+ anyOf: [
+ {
+ $ref:
"#/components/schemas/airflow__utils__state__TaskInstanceState",
+ },
+ {
+ type: "null",
+ },
+ ],
},
- up_for_reschedule: {
+ try_number: {
type: "integer",
- title: "Up For Reschedule",
+ title: "Try Number",
},
- upstream_failed: {
+ max_tries: {
type: "integer",
- title: "Upstream Failed",
+ title: "Max Tries",
},
- skipped: {
- type: "integer",
- title: "Skipped",
+ task_display_name: {
+ type: "string",
+ title: "Task Display Name",
+ },
+ hostname: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Hostname",
+ },
+ unixname: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Unixname",
+ },
+ pool: {
+ type: "string",
+ title: "Pool",
},
- deferred: {
+ pool_slots: {
type: "integer",
- title: "Deferred",
+ title: "Pool Slots",
+ },
+ queue: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Queue",
+ },
+ priority_weight: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Priority Weight",
+ },
+ operator: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Operator",
+ },
+ queued_when: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Queued When",
+ },
+ pid: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Pid",
+ },
+ executor: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Executor",
+ },
+ executor_config: {
+ type: "string",
+ title: "Executor Config",
+ },
+ note: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Note",
+ },
+ rendered_map_index: {
+ anyOf: [
+ {
+ type: "string",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Rendered Map Index",
+ },
+ rendered_fields: {
+ type: "object",
+ title: "Rendered Fields",
+ default: {},
+ },
+ trigger: {
+ anyOf: [
+ {
+ $ref: "#/components/schemas/TriggerResponse",
+ },
+ {
+ type: "null",
+ },
+ ],
+ },
+ triggerer_job: {
+ anyOf: [
+ {
+ $ref: "#/components/schemas/JobResponse",
+ },
+ {
+ type: "null",
+ },
+ ],
},
},
type: "object",
required: [
- "no_status",
- "removed",
- "scheduled",
- "queued",
- "running",
- "success",
- "restarting",
- "failed",
- "up_for_retry",
- "up_for_reschedule",
- "upstream_failed",
- "skipped",
- "deferred",
+ "task_id",
+ "dag_id",
+ "dag_run_id",
+ "map_index",
+ "logical_date",
+ "start_date",
+ "end_date",
+ "duration",
+ "state",
+ "try_number",
+ "max_tries",
+ "task_display_name",
+ "hostname",
+ "unixname",
+ "pool",
+ "pool_slots",
+ "queue",
+ "priority_weight",
+ "operator",
+ "queued_when",
+ "pid",
+ "executor",
+ "executor_config",
+ "note",
+ "rendered_map_index",
+ "trigger",
+ "triggerer_job",
],
- title: "TaskInstanceState",
+ title: "TaskInstanceResponse",
description: "TaskInstance serializer for responses.",
} as const;
+export const $TriggerResponse = {
+ properties: {
+ id: {
+ type: "integer",
+ title: "Id",
+ },
+ classpath: {
+ type: "string",
+ title: "Classpath",
+ },
+ kwargs: {
+ type: "string",
+ title: "Kwargs",
+ },
+ created_date: {
+ type: "string",
+ format: "date-time",
+ title: "Created Date",
+ },
+ triggerer_id: {
+ anyOf: [
+ {
+ type: "integer",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Triggerer Id",
+ },
+ },
+ type: "object",
+ required: ["id", "classpath", "kwargs", "created_date", "triggerer_id"],
+ title: "TriggerResponse",
+ description: "Trigger serializer for responses.",
+} as const;
+
export const $TriggererInfoSchema = {
properties: {
status: {
@@ -2314,3 +2666,101 @@ export const $VersionInfo = {
title: "VersionInfo",
description: "Version information serializer for responses.",
} as const;
+
+export const
$airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState =
+ {
+ properties: {
+ no_status: {
+ type: "integer",
+ title: "No Status",
+ },
+ removed: {
+ type: "integer",
+ title: "Removed",
+ },
+ scheduled: {
+ type: "integer",
+ title: "Scheduled",
+ },
+ queued: {
+ type: "integer",
+ title: "Queued",
+ },
+ running: {
+ type: "integer",
+ title: "Running",
+ },
+ success: {
+ type: "integer",
+ title: "Success",
+ },
+ restarting: {
+ type: "integer",
+ title: "Restarting",
+ },
+ failed: {
+ type: "integer",
+ title: "Failed",
+ },
+ up_for_retry: {
+ type: "integer",
+ title: "Up For Retry",
+ },
+ up_for_reschedule: {
+ type: "integer",
+ title: "Up For Reschedule",
+ },
+ upstream_failed: {
+ type: "integer",
+ title: "Upstream Failed",
+ },
+ skipped: {
+ type: "integer",
+ title: "Skipped",
+ },
+ deferred: {
+ type: "integer",
+ title: "Deferred",
+ },
+ },
+ type: "object",
+ required: [
+ "no_status",
+ "removed",
+ "scheduled",
+ "queued",
+ "running",
+ "success",
+ "restarting",
+ "failed",
+ "up_for_retry",
+ "up_for_reschedule",
+ "upstream_failed",
+ "skipped",
+ "deferred",
+ ],
+ title: "TaskInstanceState",
+ description: "TaskInstance serializer for responses.",
+ } as const;
+
+export const $airflow__utils__state__TaskInstanceState = {
+ type: "string",
+ enum: [
+ "removed",
+ "scheduled",
+ "queued",
+ "running",
+ "success",
+ "restarting",
+ "failed",
+ "up_for_retry",
+ "up_for_reschedule",
+ "upstream_failed",
+ "skipped",
+ "deferred",
+ ],
+ title: "TaskInstanceState",
+ description: `All possible states that a Task Instance can be in.
+
+Note that None is also allowed, so always use this in a type hint with
Optional.`,
+} as const;
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 36c4fa46d79..fd38b2ec31c 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -15,6 +15,14 @@ import type {
GetConnectionResponse,
GetConnectionsData,
GetConnectionsResponse,
+ GetDagRunData,
+ GetDagRunResponse,
+ DeleteDagRunData,
+ DeleteDagRunResponse,
+ PatchDagRunStateData,
+ PatchDagRunStateResponse,
+ GetDagSourceData,
+ GetDagSourceResponse,
GetDagsData,
GetDagsResponse,
PatchDagsData,
@@ -29,15 +37,11 @@ import type {
DeleteDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
- GetDagRunData,
- GetDagRunResponse,
- DeleteDagRunData,
- DeleteDagRunResponse,
- PatchDagRunStateData,
- PatchDagRunStateResponse,
- GetDagSourceData,
- GetDagSourceResponse,
+ GetEventLogData,
+ GetEventLogResponse,
GetHealthResponse,
+ GetPluginsData,
+ GetPluginsResponse,
DeletePoolData,
DeletePoolResponse,
GetPoolData,
@@ -50,11 +54,8 @@ import type {
PostPoolResponse,
GetProvidersData,
GetProvidersResponse,
- GetPluginsData,
- GetPluginsResponse,
- GetVersionResponse,
- GetEventLogData,
- GetEventLogResponse,
+ GetTaskInstanceData,
+ GetTaskInstanceResponse,
DeleteVariableData,
DeleteVariableResponse,
GetVariableData,
@@ -65,6 +66,7 @@ import type {
GetVariablesResponse,
PostVariableData,
PostVariableResponse,
+ GetVersionResponse,
} from "./types.gen";
export class AssetService {
@@ -246,6 +248,134 @@ export class ConnectionService {
}
}
+export class DagRunService {
+ /**
+ * Get Dag Run
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagRun(
+ data: GetDagRunData,
+ ): CancelablePromise<GetDagRunResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Delete Dag Run
+ * Delete a DAG Run entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static deleteDagRun(
+ data: DeleteDagRunData,
+ ): CancelablePromise<DeleteDagRunResponse> {
+ return __request(OpenAPI, {
+ method: "DELETE",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+
+ /**
+ * Patch Dag Run State
+ * Modify a DAG Run.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.requestBody
+ * @param data.updateMask
+ * @returns DAGRunResponse Successful Response
+ * @throws ApiError
+ */
+ public static patchDagRunState(
+ data: PatchDagRunStateData,
+ ): CancelablePromise<PatchDagRunStateResponse> {
+ return __request(OpenAPI, {
+ method: "PATCH",
+ url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ },
+ query: {
+ update_mask: data.updateMask,
+ },
+ body: data.requestBody,
+ mediaType: "application/json",
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
+
+export class DagSourceService {
+ /**
+ * Get Dag Source
+ * Get source code using file token.
+ * @param data The data for the request.
+ * @param data.fileToken
+ * @param data.accept
+ * @returns DAGSourceResponse Successful Response
+ * @throws ApiError
+ */
+ public static getDagSource(
+ data: GetDagSourceData,
+ ): CancelablePromise<GetDagSourceResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/dagSources/{file_token}",
+ path: {
+ file_token: data.fileToken,
+ },
+ headers: {
+ accept: data.accept,
+ },
+ errors: {
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 406: "Not Acceptable",
+ 422: "Validation Error",
+ },
+ });
+ }
+}
+
export class DagService {
/**
* Get Dags
@@ -479,55 +609,24 @@ export class DagService {
}
}
-export class DagRunService {
+export class EventLogService {
/**
- * Get Dag Run
+ * Get Event Log
* @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @returns DAGRunResponse Successful Response
+ * @param data.eventLogId
+ * @returns EventLogResponse Successful Response
* @throws ApiError
*/
- public static getDagRun(
- data: GetDagRunData,
- ): CancelablePromise<GetDagRunResponse> {
+ public static getEventLog(
+ data: GetEventLogData,
+ ): CancelablePromise<GetEventLogResponse> {
return __request(OpenAPI, {
method: "GET",
- url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
- path: {
- dag_id: data.dagId,
- dag_run_id: data.dagRunId,
- },
- errors: {
- 401: "Unauthorized",
- 403: "Forbidden",
- 404: "Not Found",
- 422: "Validation Error",
- },
- });
- }
-
- /**
- * Delete Dag Run
- * Delete a DAG Run entry.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @returns void Successful Response
- * @throws ApiError
- */
- public static deleteDagRun(
- data: DeleteDagRunData,
- ): CancelablePromise<DeleteDagRunResponse> {
- return __request(OpenAPI, {
- method: "DELETE",
- url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
+ url: "/public/eventLogs/{event_log_id}",
path: {
- dag_id: data.dagId,
- dag_run_id: data.dagRunId,
+ event_log_id: data.eventLogId,
},
errors: {
- 400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
@@ -535,92 +634,48 @@ export class DagRunService {
},
});
}
+}
+export class MonitorService {
/**
- * Patch Dag Run State
- * Modify a DAG Run.
- * @param data The data for the request.
- * @param data.dagId
- * @param data.dagRunId
- * @param data.requestBody
- * @param data.updateMask
- * @returns DAGRunResponse Successful Response
+ * Get Health
+ * @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
- public static patchDagRunState(
- data: PatchDagRunStateData,
- ): CancelablePromise<PatchDagRunStateResponse> {
+ public static getHealth(): CancelablePromise<GetHealthResponse> {
return __request(OpenAPI, {
- method: "PATCH",
- url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}",
- path: {
- dag_id: data.dagId,
- dag_run_id: data.dagRunId,
- },
- query: {
- update_mask: data.updateMask,
- },
- body: data.requestBody,
- mediaType: "application/json",
- errors: {
- 400: "Bad Request",
- 401: "Unauthorized",
- 403: "Forbidden",
- 404: "Not Found",
- 422: "Validation Error",
- },
+ method: "GET",
+ url: "/public/monitor/health",
});
}
}
-export class DagSourceService {
+export class PluginService {
/**
- * Get Dag Source
- * Get source code using file token.
+ * Get Plugins
* @param data The data for the request.
- * @param data.fileToken
- * @param data.accept
- * @returns DAGSourceResponse Successful Response
+ * @param data.limit
+ * @param data.offset
+ * @returns PluginCollectionResponse Successful Response
* @throws ApiError
*/
- public static getDagSource(
- data: GetDagSourceData,
- ): CancelablePromise<GetDagSourceResponse> {
+ public static getPlugins(
+ data: GetPluginsData = {},
+ ): CancelablePromise<GetPluginsResponse> {
return __request(OpenAPI, {
method: "GET",
- url: "/public/dagSources/{file_token}",
- path: {
- file_token: data.fileToken,
- },
- headers: {
- accept: data.accept,
+ url: "/public/plugins/",
+ query: {
+ limit: data.limit,
+ offset: data.offset,
},
errors: {
- 400: "Bad Request",
- 401: "Unauthorized",
- 403: "Forbidden",
- 404: "Not Found",
- 406: "Not Acceptable",
422: "Validation Error",
},
});
}
}
-export class MonitorService {
- /**
- * Get Health
- * @returns HealthInfoSchema Successful Response
- * @throws ApiError
- */
- public static getHealth(): CancelablePromise<GetHealthResponse> {
- return __request(OpenAPI, {
- method: "GET",
- url: "/public/monitor/health",
- });
- }
-}
-
export class PoolService {
/**
* Delete Pool
@@ -789,63 +844,27 @@ export class ProviderService {
}
}
-export class PluginService {
- /**
- * Get Plugins
- * @param data The data for the request.
- * @param data.limit
- * @param data.offset
- * @returns PluginCollectionResponse Successful Response
- * @throws ApiError
- */
- public static getPlugins(
- data: GetPluginsData = {},
- ): CancelablePromise<GetPluginsResponse> {
- return __request(OpenAPI, {
- method: "GET",
- url: "/public/plugins/",
- query: {
- limit: data.limit,
- offset: data.offset,
- },
- errors: {
- 422: "Validation Error",
- },
- });
- }
-}
-
-export class VersionService {
- /**
- * Get Version
- * Get version information.
- * @returns VersionInfo Successful Response
- * @throws ApiError
- */
- public static getVersion(): CancelablePromise<GetVersionResponse> {
- return __request(OpenAPI, {
- method: "GET",
- url: "/public/version/",
- });
- }
-}
-
-export class EventLogService {
+export class TaskInstanceService {
/**
- * Get Event Log
+ * Get Task Instance
+ * Get task instance.
* @param data The data for the request.
- * @param data.eventLogId
- * @returns EventLogResponse Successful Response
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
- public static getEventLog(
- data: GetEventLogData,
- ): CancelablePromise<GetEventLogResponse> {
+ public static getTaskInstance(
+ data: GetTaskInstanceData,
+ ): CancelablePromise<GetTaskInstanceResponse> {
return __request(OpenAPI, {
method: "GET",
- url: "/public/eventLogs/{event_log_id}",
+ url:
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}",
path: {
- event_log_id: data.eventLogId,
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
},
errors: {
401: "Unauthorized",
@@ -997,3 +1016,18 @@ export class VariableService {
});
}
}
+
+export class VersionService {
+ /**
+ * Get Version
+ * Get version information.
+ * @returns VersionInfo Successful Response
+ * @throws ApiError
+ */
+ public static getVersion(): CancelablePromise<GetVersionResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url: "/public/version/",
+ });
+ }
+}
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 52862ae1299..064163f4178 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -370,7 +370,23 @@ export type HealthInfoSchema = {
export type HistoricalMetricDataResponse = {
dag_run_types: DAGRunTypes;
dag_run_states: DAGRunStates;
- task_instance_states: TaskInstanceState;
+ task_instance_states:
airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState;
+};
+
+/**
+ * Job serializer for responses.
+ */
+export type JobResponse = {
+ id: number;
+ dag_id: string | null;
+ state: string | null;
+ job_type: string | null;
+ start_date: string | null;
+ end_date: string | null;
+ latest_heartbeat: string | null;
+ executor_class: string | null;
+ hostname: string | null;
+ unixname: string | null;
};
/**
@@ -471,20 +487,48 @@ export type SchedulerInfoSchema = {
/**
* TaskInstance serializer for responses.
*/
-export type TaskInstanceState = {
- no_status: number;
- removed: number;
- scheduled: number;
- queued: number;
- running: number;
- success: number;
- restarting: number;
- failed: number;
- up_for_retry: number;
- up_for_reschedule: number;
- upstream_failed: number;
- skipped: number;
- deferred: number;
+export type TaskInstanceResponse = {
+ task_id: string;
+ dag_id: string;
+ dag_run_id: string;
+ map_index: number;
+ logical_date: string;
+ start_date: string | null;
+ end_date: string | null;
+ duration: number | null;
+ state: airflow__utils__state__TaskInstanceState | null;
+ try_number: number;
+ max_tries: number;
+ task_display_name: string;
+ hostname: string | null;
+ unixname: string | null;
+ pool: string;
+ pool_slots: number;
+ queue: string | null;
+ priority_weight: number | null;
+ operator: string | null;
+ queued_when: string | null;
+ pid: number | null;
+ executor: string | null;
+ executor_config: string;
+ note: string | null;
+ rendered_map_index: string | null;
+ rendered_fields?: {
+ [key: string]: unknown;
+ };
+ trigger: TriggerResponse | null;
+ triggerer_job: JobResponse | null;
+};
+
+/**
+ * Trigger serializer for responses.
+ */
+export type TriggerResponse = {
+ id: number;
+ classpath: string;
+ kwargs: string;
+ created_date: string;
+ triggerer_id: number | null;
};
/**
@@ -535,6 +579,45 @@ export type VersionInfo = {
git_version: string | null;
};
+/**
+ * TaskInstance serializer for responses.
+ */
+export type
airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState =
+ {
+ no_status: number;
+ removed: number;
+ scheduled: number;
+ queued: number;
+ running: number;
+ success: number;
+ restarting: number;
+ failed: number;
+ up_for_retry: number;
+ up_for_reschedule: number;
+ upstream_failed: number;
+ skipped: number;
+ deferred: number;
+ };
+
+/**
+ * All possible states that a Task Instance can be in.
+ *
+ * Note that None is also allowed, so always use this in a type hint with
Optional.
+ */
+export type airflow__utils__state__TaskInstanceState =
+ | "removed"
+ | "scheduled"
+ | "queued"
+ | "running"
+ | "success"
+ | "restarting"
+ | "failed"
+ | "up_for_retry"
+ | "up_for_reschedule"
+ | "upstream_failed"
+ | "skipped"
+ | "deferred";
+
export type NextRunAssetsData = {
dagId: string;
};
@@ -585,6 +668,36 @@ export type GetConnectionsData = {
export type GetConnectionsResponse = ConnectionCollectionResponse;
+export type GetDagRunData = {
+ dagId: string;
+ dagRunId: string;
+};
+
+export type GetDagRunResponse = DAGRunResponse;
+
+export type DeleteDagRunData = {
+ dagId: string;
+ dagRunId: string;
+};
+
+export type DeleteDagRunResponse = void;
+
+export type PatchDagRunStateData = {
+ dagId: string;
+ dagRunId: string;
+ requestBody: DAGRunPatchBody;
+ updateMask?: Array<string> | null;
+};
+
+export type PatchDagRunStateResponse = DAGRunResponse;
+
+export type GetDagSourceData = {
+ accept?: string;
+ fileToken: string;
+};
+
+export type GetDagSourceResponse = DAGSourceResponse;
+
export type GetDagsData = {
dagDisplayNamePattern?: string | null;
dagIdPattern?: string | null;
@@ -650,37 +763,20 @@ export type GetDagDetailsData = {
export type GetDagDetailsResponse = DAGDetailsResponse;
-export type GetDagRunData = {
- dagId: string;
- dagRunId: string;
-};
-
-export type GetDagRunResponse = DAGRunResponse;
-
-export type DeleteDagRunData = {
- dagId: string;
- dagRunId: string;
+export type GetEventLogData = {
+ eventLogId: number;
};
-export type DeleteDagRunResponse = void;
-
-export type PatchDagRunStateData = {
- dagId: string;
- dagRunId: string;
- requestBody: DAGRunPatchBody;
- updateMask?: Array<string> | null;
-};
+export type GetEventLogResponse = EventLogResponse;
-export type PatchDagRunStateResponse = DAGRunResponse;
+export type GetHealthResponse = HealthInfoSchema;
-export type GetDagSourceData = {
- accept?: string;
- fileToken: string;
+export type GetPluginsData = {
+ limit?: number;
+ offset?: number;
};
-export type GetDagSourceResponse = DAGSourceResponse;
-
-export type GetHealthResponse = HealthInfoSchema;
+export type GetPluginsResponse = PluginCollectionResponse;
export type DeletePoolData = {
poolName: string;
@@ -723,20 +819,13 @@ export type GetProvidersData = {
export type GetProvidersResponse = ProviderCollectionResponse;
-export type GetPluginsData = {
- limit?: number;
- offset?: number;
+export type GetTaskInstanceData = {
+ dagId: string;
+ dagRunId: string;
+ taskId: string;
};
-export type GetPluginsResponse = PluginCollectionResponse;
-
-export type GetVersionResponse = VersionInfo;
-
-export type GetEventLogData = {
- eventLogId: number;
-};
-
-export type GetEventLogResponse = EventLogResponse;
+export type GetTaskInstanceResponse = TaskInstanceResponse;
export type DeleteVariableData = {
variableKey: string;
@@ -772,6 +861,8 @@ export type PostVariableData = {
export type PostVariableResponse = VariableResponse;
+export type GetVersionResponse = VersionInfo;
+
export type $OpenApiTs = {
"/ui/next_run_assets/{dag_id}": {
get: {
@@ -903,27 +994,39 @@ export type $OpenApiTs = {
};
};
};
- "/public/dags/": {
+ "/public/dags/{dag_id}/dagRuns/{dag_run_id}": {
get: {
- req: GetDagsData;
+ req: GetDagRunData;
res: {
/**
* Successful Response
*/
- 200: DAGCollectionResponse;
+ 200: DAGRunResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
- patch: {
- req: PatchDagsData;
+ delete: {
+ req: DeleteDagRunData;
res: {
/**
* Successful Response
*/
- 200: DAGCollectionResponse;
+ 204: void;
/**
* Bad Request
*/
@@ -946,15 +1049,17 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
- };
- "/public/dags/tags": {
- get: {
- req: GetDagTagsData;
+ patch: {
+ req: PatchDagRunStateData;
res: {
/**
* Successful Response
*/
- 200: DAGTagCollectionResponse;
+ 200: DAGRunResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
/**
* Unauthorized
*/
@@ -963,6 +1068,10 @@ export type $OpenApiTs = {
* Forbidden
*/
403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
/**
* Validation Error
*/
@@ -970,14 +1079,14 @@ export type $OpenApiTs = {
};
};
};
- "/public/dags/{dag_id}": {
+ "/public/dagSources/{file_token}": {
get: {
- req: GetDagData;
+ req: GetDagSourceData;
res: {
/**
* Successful Response
*/
- 200: DAGResponse;
+ 200: DAGSourceResponse;
/**
* Bad Request
*/
@@ -995,18 +1104,37 @@ export type $OpenApiTs = {
*/
404: HTTPExceptionResponse;
/**
- * Unprocessable Entity
+ * Not Acceptable
*/
- 422: HTTPExceptionResponse;
+ 406: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ "/public/dags/": {
+ get: {
+ req: GetDagsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: DAGCollectionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
};
};
patch: {
- req: PatchDagData;
+ req: PatchDagsData;
res: {
/**
* Successful Response
*/
- 200: DAGResponse;
+ 200: DAGCollectionResponse;
/**
* Bad Request
*/
@@ -1029,17 +1157,15 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
- delete: {
- req: DeleteDagData;
+ };
+ "/public/dags/tags": {
+ get: {
+ req: GetDagTagsData;
res: {
/**
* Successful Response
*/
- 200: unknown;
- /**
- * Bad Request
- */
- 400: HTTPExceptionResponse;
+ 200: DAGTagCollectionResponse;
/**
* Unauthorized
*/
@@ -1049,24 +1175,20 @@ export type $OpenApiTs = {
*/
403: HTTPExceptionResponse;
/**
- * Not Found
- */
- 404: HTTPExceptionResponse;
- /**
- * Unprocessable Entity
+ * Validation Error
*/
- 422: HTTPExceptionResponse;
+ 422: HTTPValidationError;
};
};
};
- "/public/dags/{dag_id}/details": {
+ "/public/dags/{dag_id}": {
get: {
- req: GetDagDetailsData;
+ req: GetDagData;
res: {
/**
* Successful Response
*/
- 200: DAGDetailsResponse;
+ 200: DAGResponse;
/**
* Bad Request
*/
@@ -1089,15 +1211,17 @@ export type $OpenApiTs = {
422: HTTPExceptionResponse;
};
};
- };
- "/public/dags/{dag_id}/dagRuns/{dag_run_id}": {
- get: {
- req: GetDagRunData;
+ patch: {
+ req: PatchDagData;
res: {
/**
* Successful Response
*/
- 200: DAGRunResponse;
+ 200: DAGResponse;
+ /**
+ * Bad Request
+ */
+ 400: HTTPExceptionResponse;
/**
* Unauthorized
*/
@@ -1117,12 +1241,12 @@ export type $OpenApiTs = {
};
};
delete: {
- req: DeleteDagRunData;
+ req: DeleteDagData;
res: {
/**
* Successful Response
*/
- 204: void;
+ 200: unknown;
/**
* Bad Request
*/
@@ -1140,18 +1264,20 @@ export type $OpenApiTs = {
*/
404: HTTPExceptionResponse;
/**
- * Validation Error
+ * Unprocessable Entity
*/
- 422: HTTPValidationError;
+ 422: HTTPExceptionResponse;
};
};
- patch: {
- req: PatchDagRunStateData;
+ };
+ "/public/dags/{dag_id}/details": {
+ get: {
+ req: GetDagDetailsData;
res: {
/**
* Successful Response
*/
- 200: DAGRunResponse;
+ 200: DAGDetailsResponse;
/**
* Bad Request
*/
@@ -1169,24 +1295,20 @@ export type $OpenApiTs = {
*/
404: HTTPExceptionResponse;
/**
- * Validation Error
+ * Unprocessable Entity
*/
- 422: HTTPValidationError;
+ 422: HTTPExceptionResponse;
};
};
};
- "/public/dagSources/{file_token}": {
+ "/public/eventLogs/{event_log_id}": {
get: {
- req: GetDagSourceData;
+ req: GetEventLogData;
res: {
/**
* Successful Response
*/
- 200: DAGSourceResponse;
- /**
- * Bad Request
- */
- 400: HTTPExceptionResponse;
+ 200: EventLogResponse;
/**
* Unauthorized
*/
@@ -1199,10 +1321,6 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
- /**
- * Not Acceptable
- */
- 406: HTTPExceptionResponse;
/**
* Validation Error
*/
@@ -1220,6 +1338,21 @@ export type $OpenApiTs = {
};
};
};
+ "/public/plugins/": {
+ get: {
+ req: GetPluginsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: PluginCollectionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/pools/{pool_name}": {
delete: {
req: DeletePoolData;
@@ -1368,39 +1501,14 @@ export type $OpenApiTs = {
};
};
};
- "/public/plugins/": {
- get: {
- req: GetPluginsData;
- res: {
- /**
- * Successful Response
- */
- 200: PluginCollectionResponse;
- /**
- * Validation Error
- */
- 422: HTTPValidationError;
- };
- };
- };
- "/public/version/": {
- get: {
- res: {
- /**
- * Successful Response
- */
- 200: VersionInfo;
- };
- };
- };
- "/public/eventLogs/{event_log_id}": {
+ "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}": {
get: {
- req: GetEventLogData;
+ req: GetTaskInstanceData;
res: {
/**
* Successful Response
*/
- 200: EventLogResponse;
+ 200: TaskInstanceResponse;
/**
* Unauthorized
*/
@@ -1545,4 +1653,14 @@ export type $OpenApiTs = {
};
};
};
+ "/public/version/": {
+ get: {
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: VersionInfo;
+ };
+ };
+ };
};
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
new file mode 100644
index 00000000000..85b4639d6c0
--- /dev/null
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -0,0 +1,396 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime as dt
+import urllib
+
+import pendulum
+import pytest
+
+from airflow.jobs.job import Job
+from airflow.jobs.triggerer_job_runner import TriggererJobRunner
+from airflow.models import DagRun, TaskInstance
+from airflow.models.dagbag import DagBag
+from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
+from airflow.models.trigger import Trigger
+from airflow.utils.platform import getuser
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.db import clear_db_runs, clear_rendered_ti_fields
+
+pytestmark = pytest.mark.db_test
+
+
+DEFAULT_DATETIME_1 = datetime(2020, 1, 1, tzinfo=dt.timezone.utc)
+DEFAULT_DATETIME_STR_1 = "2020-01-01T00:00:00+00:00"
+DEFAULT_DATETIME_STR_2 = "2020-01-02T00:00:00+00:00"
+
+QUOTED_DEFAULT_DATETIME_STR_1 = urllib.parse.quote(DEFAULT_DATETIME_STR_1)
+QUOTED_DEFAULT_DATETIME_STR_2 = urllib.parse.quote(DEFAULT_DATETIME_STR_2)
+
+
+class TestTaskInstanceEndpoint:
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ @pytest.fixture(autouse=True)
+ def setup_attrs(self, session) -> None:
+ self.default_time = DEFAULT_DATETIME_1
+ self.ti_init = {
+ "execution_date": self.default_time,
+ "state": State.RUNNING,
+ }
+ self.ti_extras = {
+ "start_date": self.default_time + dt.timedelta(days=1),
+ "end_date": self.default_time + dt.timedelta(days=2),
+ "pid": 100,
+ "duration": 10000,
+ "pool": "default_pool",
+ "queue": "default_queue",
+ "job_id": 0,
+ }
+ clear_db_runs()
+ clear_rendered_ti_fields()
+ dagbag = DagBag(include_examples=True, read_dags_from_db=False)
+ dagbag.sync_to_db()
+ self.dagbag = dagbag
+
+ def create_task_instances(
+ self,
+ session,
+ dag_id: str = "example_python_operator",
+ update_extras: bool = True,
+ task_instances=None,
+ dag_run_state=State.RUNNING,
+ with_ti_history=False,
+ ):
+ """Method to create task instances using kwargs and default
arguments"""
+
+ dag = self.dagbag.get_dag(dag_id)
+ tasks = dag.tasks
+ counter = len(tasks)
+ if task_instances is not None:
+ counter = min(len(task_instances), counter)
+
+ run_id = "TEST_DAG_RUN_ID"
+ execution_date = self.ti_init.pop("execution_date", self.default_time)
+ dr = None
+
+ tis = []
+ for i in range(counter):
+ if task_instances is None:
+ pass
+ elif update_extras:
+ self.ti_extras.update(task_instances[i])
+ else:
+ self.ti_init.update(task_instances[i])
+
+ if "execution_date" in self.ti_init:
+ run_id = f"TEST_DAG_RUN_ID_{i}"
+ execution_date = self.ti_init.pop("execution_date")
+ dr = None
+
+ if not dr:
+ dr = DagRun(
+ run_id=run_id,
+ dag_id=dag_id,
+ execution_date=execution_date,
+ run_type=DagRunType.MANUAL,
+ state=dag_run_state,
+ )
+ session.add(dr)
+ ti = TaskInstance(task=tasks[i], **self.ti_init)
+ session.add(ti)
+ ti.dag_run = dr
+ ti.note = "placeholder-note"
+
+ for key, value in self.ti_extras.items():
+ setattr(ti, key, value)
+ tis.append(ti)
+
+ session.commit()
+ if with_ti_history:
+ for ti in tis:
+ ti.try_number = 1
+ session.merge(ti)
+ session.commit()
+ dag.clear()
+ for ti in tis:
+ ti.try_number = 2
+ ti.queue = "default_queue"
+ session.merge(ti)
+ session.commit()
+ return tis
+
+ session.commit()
+ if with_ti_history:
+ for ti in tis:
+ ti.try_number = 1
+ session.merge(ti)
+ session.commit()
+ dag.clear()
+ for ti in tis:
+ ti.try_number = 2
+ ti.queue = "default_queue"
+ session.merge(ti)
+ session.commit()
+ return tis
+
+
+class TestGetTaskInstance(TestTaskInstanceEndpoint):
+ def test_should_respond_200(self, test_client, session):
+ self.create_task_instances(session)
+ # Update ti and set operator to None to
+ # test that operator field is nullable.
+ # This prevents issue when users upgrade to 2.0+
+ # from 1.10.x
+ # https://github.com/apache/airflow/issues/14421
+ session.query(TaskInstance).update({TaskInstance.operator: None},
synchronize_session="fetch")
+ session.commit()
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_id": "example_python_operator",
+ "duration": 10000.0,
+ "end_date": "2020-01-03T00:00:00Z",
+ "logical_date": "2020-01-01T00:00:00Z",
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "map_index": -1,
+ "max_tries": 0,
+ "note": "placeholder-note",
+ "operator": None,
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "running",
+ "task_id": "print_the_context",
+ "task_display_name": "print_the_context",
+ "try_number": 0,
+ "unixname": getuser(),
+ "dag_run_id": "TEST_DAG_RUN_ID",
+ "rendered_fields": {},
+ "rendered_map_index": None,
+ "trigger": None,
+ "triggerer_job": None,
+ }
+
+ def test_should_respond_200_with_task_state_in_deferred(self, test_client,
session):
+ now = pendulum.now("UTC")
+ ti = self.create_task_instances(
+ session, task_instances=[{"state": State.DEFERRED}],
update_extras=True
+ )[0]
+ ti.trigger = Trigger("none", {})
+ ti.trigger.created_date = now
+ ti.triggerer_job = Job()
+ TriggererJobRunner(job=ti.triggerer_job)
+ ti.triggerer_job.state = "running"
+ session.commit()
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ data = response.json()
+
+ # this logic in effect replicates mock.ANY for these values
+ values_to_ignore = {
+ "trigger": ["created_date", "id", "triggerer_id"],
+ "triggerer_job": ["executor_class", "hostname", "id",
"latest_heartbeat", "start_date"],
+ }
+ for k, v in values_to_ignore.items():
+ for elem in v:
+ del data[k][elem]
+
+ assert response.status_code == 200
+ assert data == {
+ "dag_id": "example_python_operator",
+ "duration": 10000.0,
+ "end_date": "2020-01-03T00:00:00Z",
+ "logical_date": "2020-01-01T00:00:00Z",
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "map_index": -1,
+ "max_tries": 0,
+ "note": "placeholder-note",
+ "operator": "PythonOperator",
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "deferred",
+ "task_id": "print_the_context",
+ "task_display_name": "print_the_context",
+ "try_number": 0,
+ "unixname": getuser(),
+ "dag_run_id": "TEST_DAG_RUN_ID",
+ "rendered_fields": {},
+ "rendered_map_index": None,
+ "trigger": {
+ "classpath": "none",
+ "kwargs": "{}",
+ },
+ "triggerer_job": {
+ "dag_id": None,
+ "end_date": None,
+ "job_type": "TriggererJob",
+ "state": "running",
+ "unixname": getuser(),
+ },
+ }
+
+ def test_should_respond_200_with_task_state_in_removed(self, test_client,
session):
+ self.create_task_instances(session, task_instances=[{"state":
State.REMOVED}], update_extras=True)
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_id": "example_python_operator",
+ "duration": 10000.0,
+ "end_date": "2020-01-03T00:00:00Z",
+ "logical_date": "2020-01-01T00:00:00Z",
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "map_index": -1,
+ "max_tries": 0,
+ "note": "placeholder-note",
+ "operator": "PythonOperator",
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "removed",
+ "task_id": "print_the_context",
+ "task_display_name": "print_the_context",
+ "try_number": 0,
+ "unixname": getuser(),
+ "dag_run_id": "TEST_DAG_RUN_ID",
+ "rendered_fields": {},
+ "rendered_map_index": None,
+ "trigger": None,
+ "triggerer_job": None,
+ }
+
+ def test_should_respond_200_task_instance_with_rendered(self, test_client,
session):
+ tis = self.create_task_instances(session)
+ session.query()
+ rendered_fields = RTIF(tis[0], render_templates=False)
+ session.add(rendered_fields)
+ session.commit()
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ assert response.status_code == 200
+
+ assert response.json() == {
+ "dag_id": "example_python_operator",
+ "duration": 10000.0,
+ "end_date": "2020-01-03T00:00:00Z",
+ "logical_date": "2020-01-01T00:00:00Z",
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "map_index": -1,
+ "max_tries": 0,
+ "note": "placeholder-note",
+ "operator": "PythonOperator",
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "running",
+ "task_id": "print_the_context",
+ "task_display_name": "print_the_context",
+ "try_number": 0,
+ "unixname": getuser(),
+ "dag_run_id": "TEST_DAG_RUN_ID",
+ "rendered_fields": {"op_args": [], "op_kwargs": {},
"templates_dict": None},
+ "rendered_map_index": None,
+ "trigger": None,
+ "triggerer_job": None,
+ }
+
+ def test_raises_404_for_nonexistent_task_instance(self, test_client):
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ assert response.status_code == 404
+ assert response.json() == {
+ "detail": "The Task Instance with dag_id:
`example_python_operator`, run_id: `TEST_DAG_RUN_ID` and task_id:
`print_the_context` was not found"
+ }
+
+ def test_raises_404_for_mapped_task_instance_with_multiple_indexes(self,
test_client, session):
+ tis = self.create_task_instances(session)
+
+ old_ti = tis[0]
+
+ for index in range(3):
+ ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id,
map_index=index)
+ for attr in ["duration", "end_date", "pid", "start_date", "state",
"queue", "note"]:
+ setattr(ti, attr, getattr(old_ti, attr))
+ session.add(ti)
+ session.delete(old_ti)
+ session.commit()
+
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ assert response.status_code == 404
+ assert response.json() == {"detail": "Task instance is mapped, add the
map_index value to the URL"}
+
+ def test_raises_404_for_mapped_task_instance_with_one_index(self,
test_client, session):
+ tis = self.create_task_instances(session)
+
+ old_ti = tis[0]
+
+ ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=2)
+ for attr in ["duration", "end_date", "pid", "start_date", "state",
"queue", "note"]:
+ setattr(ti, attr, getattr(old_ti, attr))
+ session.add(ti)
+ session.delete(old_ti)
+ session.commit()
+
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+ )
+ assert response.status_code == 404
+ assert response.json() == {"detail": "Task instance is mapped, add the
map_index value to the URL"}