This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c2959c9598 Add executor field to the task instance API (#40034)
c2959c9598 is described below
commit c2959c9598d258c5f41eecfc3666f41830dda248
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jun 14 07:49:28 2024 -0700
Add executor field to the task instance API (#40034)
Return executor as part of TaskInstance queries and also enable
filtering by executor field.
Also use the changes to display the executor field on the TaskInstance
Details web page.
Co-authored-by: Vincent <[email protected]>
---
.../endpoints/task_instance_endpoint.py | 5 +++
airflow/api_connexion/openapi/v1.yaml | 27 ++++++++++++-
.../api_connexion/schemas/task_instance_schema.py | 2 +
.../static/js/dag/details/taskInstance/Details.tsx | 7 ++++
airflow/www/static/js/types/api-generated.ts | 17 +++++++++
.../test_mapped_task_instance_endpoint.py | 21 ++++++++++-
.../endpoints/test_task_instance_endpoint.py | 44 ++++++++++++++++++++++
.../schemas/test_task_instance_schema.py | 2 +
8 files changed, 123 insertions(+), 2 deletions(-)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 70bebcb1b3..9919162262 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -173,6 +173,7 @@ def get_mapped_task_instances(
state: list[str] | None = None,
pool: list[str] | None = None,
queue: list[str] | None = None,
+ executor: list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
@@ -221,6 +222,7 @@ def get_mapped_task_instances(
base_query = _apply_array_filter(base_query, key=TI.state, values=states)
base_query = _apply_array_filter(base_query, key=TI.pool, values=pool)
base_query = _apply_array_filter(base_query, key=TI.queue, values=queue)
+ base_query = _apply_array_filter(base_query, key=TI.executor,
values=executor)
# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)
@@ -323,6 +325,7 @@ def get_task_instances(
state: list[str] | None = None,
pool: list[str] | None = None,
queue: list[str] | None = None,
+ executor: list[str] | None = None,
offset: int | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
@@ -354,6 +357,7 @@ def get_task_instances(
base_query = _apply_array_filter(base_query, key=TI.state, values=states)
base_query = _apply_array_filter(base_query, key=TI.pool, values=pool)
base_query = _apply_array_filter(base_query, key=TI.queue, values=queue)
+ base_query = _apply_array_filter(base_query, key=TI.executor,
values=executor)
# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)
@@ -428,6 +432,7 @@ def get_task_instances_batch(session: Session =
NEW_SESSION) -> APIResponse:
base_query = _apply_array_filter(base_query, key=TI.state, values=states)
base_query = _apply_array_filter(base_query, key=TI.pool,
values=data["pool"])
base_query = _apply_array_filter(base_query, key=TI.queue,
values=data["queue"])
+ base_query = _apply_array_filter(base_query, key=TI.executor,
values=data["executor"])
# Count elements before joining extra columns
total_entries = get_query_count(base_query, session=session)
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index c9b5a9d808..273d69ab70 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1503,6 +1503,7 @@ paths:
- $ref: "#/components/parameters/FilterState"
- $ref: "#/components/parameters/FilterPool"
- $ref: "#/components/parameters/FilterQueue"
+ - $ref: "#/components/parameters/FilterExecutor"
get:
summary: List task instances
description: >
@@ -1671,6 +1672,7 @@ paths:
- $ref: "#/components/parameters/FilterState"
- $ref: "#/components/parameters/FilterPool"
- $ref: "#/components/parameters/FilterQueue"
+ - $ref: "#/components/parameters/FilterExecutor"
- $ref: "#/components/parameters/OrderBy"
responses:
"200":
@@ -3751,6 +3753,13 @@ components:
pid:
type: integer
nullable: true
+ executor:
+ type: string
+ nullable: true
+ description: |
+ Executor the task is configured to run on or None (which indicates
the default executor)
+
+ *New in version 2.10.0*
executor_config:
type: string
sla_miss:
@@ -4090,6 +4099,10 @@ components:
type: string
readOnly: true
nullable: true
+ executor:
+ type: string
+ readOnly: true
+ nullable: true
pool:
type: string
readOnly: true
@@ -4951,6 +4964,11 @@ components:
items:
type: string
description: The value can be repeated to retrieve multiple matching
values (OR condition).
+ executor:
+ type: array
+ items:
+ type: string
+ description: The value can be repeated to retrieve multiple matching
values (OR condition).
# Common data type
ScheduleInterval:
@@ -5507,7 +5525,14 @@ components:
items:
type: string
description: The value can be repeated to retrieve multiple matching
values (OR condition).
-
+ FilterExecutor:
+ in: query
+ name: executor
+ schema:
+ type: array
+ items:
+ type: string
+ description: The value can be repeated to retrieve multiple matching
values (OR condition).
FilterTags:
in: query
name: tags
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py
b/airflow/api_connexion/schemas/task_instance_schema.py
index 04c2edc49c..8ef2987c88 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -65,6 +65,7 @@ class TaskInstanceSchema(SQLAlchemySchema):
operator = auto_field()
queued_dttm = auto_field(data_key="queued_when")
pid = auto_field()
+ executor = auto_field()
executor_config = auto_field()
note = auto_field()
sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
@@ -118,6 +119,7 @@ class TaskInstanceBatchFormSchema(Schema):
state = fields.List(fields.Str(allow_none=True), load_default=None)
pool = fields.List(fields.Str(), load_default=None)
queue = fields.List(fields.Str(), load_default=None)
+ executor = fields.List(fields.Str(), load_default=None)
class ClearTaskInstanceFormSchema(Schema):
diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
index 80e26d79ca..17d5f13baa 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
@@ -53,6 +53,7 @@ const Details = ({ gridInstance, taskInstance, group }:
Props) => {
const endDate = gridInstance?.endDate || taskInstance?.endDate;
const taskId = gridInstance?.taskId || taskInstance?.taskId;
const mapIndex = gridInstance?.mapIndex || taskInstance?.mapIndex;
+ const executor = taskInstance?.executor || "<default>";
const operator = group?.operator || taskInstance?.operator;
@@ -243,6 +244,12 @@ const Details = ({ gridInstance, taskInstance, group }:
Props) => {
<Td>{taskInstance.poolSlots}</Td>
</Tr>
)}
+ {executor && (
+ <Tr>
+ <Td>Executor</Td>
+ <Td>{executor}</Td>
+ </Tr>
+ )}
{!!taskInstance?.executorConfig && (
<Tr>
<Td>Executor Config</Td>
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index a04c1cd0e0..1b82d07835 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -474,6 +474,8 @@ export interface paths {
pool?: components["parameters"]["FilterPool"];
/** The value can be repeated to retrieve multiple matching values (OR
condition). */
queue?: components["parameters"]["FilterQueue"];
+ /** The value can be repeated to retrieve multiple matching values (OR
condition). */
+ executor?: components["parameters"]["FilterExecutor"];
};
};
};
@@ -1497,6 +1499,12 @@ export interface components {
/** @description The datetime that the task enter the state QUEUE, also
known as queue_at */
queued_when?: string | null;
pid?: number | null;
+ /**
+ * @description Executor the task is configured to run on or None (which
indicates the default executor)
+ *
+ * *New in version 2.10.0*
+ */
+ executor?: string | null;
executor_config?: string;
sla_miss?: components["schemas"]["SLAMiss"];
/**
@@ -1688,6 +1696,7 @@ export interface components {
wait_for_downstream?: boolean;
retries?: number;
queue?: string | null;
+ executor?: string | null;
pool?: string;
pool_slots?: number;
execution_timeout?: components["schemas"]["TimeDelta"];
@@ -2239,6 +2248,8 @@ export interface components {
pool?: string[];
/** @description The value can be repeated to retrieve multiple matching
values (OR condition). */
queue?: string[];
+ /** @description The value can be repeated to retrieve multiple matching
values (OR condition). */
+ executor?: string[];
};
/**
* @description Schedule interval. Defines how often DAG runs, this object
gets added to your latest task instance's
@@ -2567,6 +2578,8 @@ export interface components {
FilterPool: string[];
/** @description The value can be repeated to retrieve multiple matching
values (OR condition). */
FilterQueue: string[];
+ /** @description The value can be repeated to retrieve multiple matching
values (OR condition). */
+ FilterExecutor: string[];
/**
* @description List of tags to filter results.
*
@@ -3970,6 +3983,8 @@ export interface operations {
pool?: components["parameters"]["FilterPool"];
/** The value can be repeated to retrieve multiple matching values (OR
condition). */
queue?: components["parameters"]["FilterQueue"];
+ /** The value can be repeated to retrieve multiple matching values (OR
condition). */
+ executor?: components["parameters"]["FilterExecutor"];
/** The numbers of items to return. */
limit?: components["parameters"]["PageLimit"];
/** The number of items to skip before starting to collect the result
set. */
@@ -4198,6 +4213,8 @@ export interface operations {
pool?: components["parameters"]["FilterPool"];
/** The value can be repeated to retrieve multiple matching values (OR
condition). */
queue?: components["parameters"]["FilterQueue"];
+ /** The value can be repeated to retrieve multiple matching values (OR
condition). */
+ executor?: components["parameters"]["FilterExecutor"];
/**
* The name of the field to order the results by.
* Prefix a field name with `-` to reverse the sort order.
diff --git
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 8d5c854eb4..78054f379e 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -98,7 +98,7 @@ class TestMappedTaskInstanceEndpoint:
count = dag["success"] + dag["running"]
with dag_maker(session=session, dag_id=dag_id,
start_date=DEFAULT_DATETIME_1):
task1 = BaseOperator(task_id="op1")
- mapped =
MockOperator.partial(task_id="task_2").expand(arg2=task1.output)
+ mapped = MockOperator.partial(task_id="task_2",
executor="default").expand(arg2=task1.output)
dr = dag_maker.create_dagrun(run_id=f"run_{dag_id}")
@@ -221,6 +221,7 @@ class
TestGetMappedTaskInstance(TestMappedTaskInstanceEndpoint):
"duration": None,
"end_date": None,
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": "default",
"executor_config": "{}",
"hostname": "",
"map_index": 0,
@@ -453,6 +454,24 @@ class
TestGetMappedTaskInstances(TestMappedTaskInstanceEndpoint):
assert response.json["total_entries"] == 0
assert response.json["task_instances"] == []
+ @provide_session
+ def test_mapped_task_instances_with_executor(self,
one_task_with_mapped_tis, session):
+ response = self.client.get(
+
"/api/v1/dags/mapped_tis/dagRuns/run_mapped_tis/taskInstances/task_2/listMapped?executor=default",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 200
+ assert response.json["total_entries"] == 3
+ assert len(response.json["task_instances"]) == 3
+
+ response = self.client.get(
+
"/api/v1/dags/mapped_tis/dagRuns/run_mapped_tis/taskInstances/task_2/listMapped?executor=no_exec",
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 200
+ assert response.json["total_entries"] == 0
+ assert response.json["task_instances"] == []
+
@provide_session
def test_mapped_task_instances_with_zero_mapped(self,
one_task_with_zero_mapped_tis, session):
response = self.client.get(
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index c3305e95b5..62ae45c1bd 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -227,6 +227,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -285,6 +286,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -332,6 +334,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -382,6 +385,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -442,6 +446,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": map_index,
@@ -667,6 +672,31 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
2,
id="test queue filter ~",
),
+ pytest.param(
+ [
+ {"executor": "test_exec_1"},
+ {"executor": "test_exec_2"},
+ {"executor": "test_exec_3"},
+ ],
+ True,
+ (
+
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID"
+ "/taskInstances?executor=test_exec_1,test_exec_2"
+ ),
+ 2,
+ id="test_executor_filter",
+ ),
+ pytest.param(
+ [
+ {"executor": "test_exec_1"},
+ {"executor": "test_exec_2"},
+ {"executor": "test_exec_3"},
+ ],
+ True,
+
"/api/v1/dags/~/dagRuns/~/taskInstances?executor=test_exec_1,test_exec_2",
+ 2,
+ id="test executor filter ~",
+ ),
],
)
def test_should_respond_200(self, task_instances, update_extras, url,
expected_ti, session):
@@ -769,6 +799,18 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"test",
id="test queue filter",
),
+ pytest.param(
+ [
+ {"executor": "test_exec_1"},
+ {"executor": "test_exec_2"},
+ {"executor": "test_exec_3"},
+ ],
+ True,
+ {"executor": ["test_exec_1", "test_exec_2"]},
+ 2,
+ "test",
+ id="test executor filter",
+ ),
pytest.param(
[
{"pool": "test_pool_1"},
@@ -2367,6 +2409,7 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -2426,6 +2469,7 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": map_index,
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py
b/tests/api_connexion/schemas/test_task_instance_schema.py
index c327e182fa..0ff10a7737 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -74,6 +74,7 @@ class TestTaskInstanceSchema:
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,
@@ -122,6 +123,7 @@ class TestTaskInstanceSchema:
"duration": 10000.0,
"end_date": "2020-01-03T00:00:00+00:00",
"execution_date": "2020-01-01T00:00:00+00:00",
+ "executor": None,
"executor_config": "{}",
"hostname": "",
"map_index": -1,