This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2959c9598 Add executor field to the task instance API (#40034)
c2959c9598 is described below

commit c2959c9598d258c5f41eecfc3666f41830dda248
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Jun 14 07:49:28 2024 -0700

    Add executor field to the task instance API (#40034)
    
    Return executor as part of TaskInstance queries and also enable
    filtering by executor field.
    
    Also use the changes to display the executor field on the TaskInstance
    Details web page.
    
    Co-authored-by: Vincent <[email protected]>
---
 .../endpoints/task_instance_endpoint.py            |  5 +++
 airflow/api_connexion/openapi/v1.yaml              | 27 ++++++++++++-
 .../api_connexion/schemas/task_instance_schema.py  |  2 +
 .../static/js/dag/details/taskInstance/Details.tsx |  7 ++++
 airflow/www/static/js/types/api-generated.ts       | 17 +++++++++
 .../test_mapped_task_instance_endpoint.py          | 21 ++++++++++-
 .../endpoints/test_task_instance_endpoint.py       | 44 ++++++++++++++++++++++
 .../schemas/test_task_instance_schema.py           |  2 +
 8 files changed, 123 insertions(+), 2 deletions(-)

diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 70bebcb1b3..9919162262 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -173,6 +173,7 @@ def get_mapped_task_instances(
     state: list[str] | None = None,
     pool: list[str] | None = None,
     queue: list[str] | None = None,
+    executor: list[str] | None = None,
     limit: int | None = None,
     offset: int | None = None,
     order_by: str | None = None,
@@ -221,6 +222,7 @@ def get_mapped_task_instances(
     base_query = _apply_array_filter(base_query, key=TI.state, values=states)
     base_query = _apply_array_filter(base_query, key=TI.pool, values=pool)
     base_query = _apply_array_filter(base_query, key=TI.queue, values=queue)
+    base_query = _apply_array_filter(base_query, key=TI.executor, 
values=executor)
 
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
@@ -323,6 +325,7 @@ def get_task_instances(
     state: list[str] | None = None,
     pool: list[str] | None = None,
     queue: list[str] | None = None,
+    executor: list[str] | None = None,
     offset: int | None = None,
     session: Session = NEW_SESSION,
 ) -> APIResponse:
@@ -354,6 +357,7 @@ def get_task_instances(
     base_query = _apply_array_filter(base_query, key=TI.state, values=states)
     base_query = _apply_array_filter(base_query, key=TI.pool, values=pool)
     base_query = _apply_array_filter(base_query, key=TI.queue, values=queue)
+    base_query = _apply_array_filter(base_query, key=TI.executor, 
values=executor)
 
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
@@ -428,6 +432,7 @@ def get_task_instances_batch(session: Session = 
NEW_SESSION) -> APIResponse:
     base_query = _apply_array_filter(base_query, key=TI.state, values=states)
     base_query = _apply_array_filter(base_query, key=TI.pool, 
values=data["pool"])
     base_query = _apply_array_filter(base_query, key=TI.queue, 
values=data["queue"])
+    base_query = _apply_array_filter(base_query, key=TI.executor, 
values=data["executor"])
 
     # Count elements before joining extra columns
     total_entries = get_query_count(base_query, session=session)
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index c9b5a9d808..273d69ab70 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1503,6 +1503,7 @@ paths:
       - $ref: "#/components/parameters/FilterState"
       - $ref: "#/components/parameters/FilterPool"
       - $ref: "#/components/parameters/FilterQueue"
+      - $ref: "#/components/parameters/FilterExecutor"
     get:
       summary: List task instances
       description: >
@@ -1671,6 +1672,7 @@ paths:
         - $ref: "#/components/parameters/FilterState"
         - $ref: "#/components/parameters/FilterPool"
         - $ref: "#/components/parameters/FilterQueue"
+        - $ref: "#/components/parameters/FilterExecutor"
         - $ref: "#/components/parameters/OrderBy"
       responses:
         "200":
@@ -3751,6 +3753,13 @@ components:
         pid:
           type: integer
           nullable: true
+        executor:
+          type: string
+          nullable: true
+          description: |
+            Executor the task is configured to run on or None (which indicates 
the default executor)
+
+            *New in version 2.10.0*
         executor_config:
           type: string
         sla_miss:
@@ -4090,6 +4099,10 @@ components:
           type: string
           readOnly: true
           nullable: true
+        executor:
+          type: string
+          readOnly: true
+          nullable: true
         pool:
           type: string
           readOnly: true
@@ -4951,6 +4964,11 @@ components:
           items:
             type: string
           description: The value can be repeated to retrieve multiple matching 
values (OR condition).
+        executor:
+          type: array
+          items:
+            type: string
+          description: The value can be repeated to retrieve multiple matching 
values (OR condition).
 
     # Common data type
     ScheduleInterval:
@@ -5507,7 +5525,14 @@ components:
         items:
           type: string
       description: The value can be repeated to retrieve multiple matching 
values (OR condition).
-
+    FilterExecutor:
+      in: query
+      name: executor
+      schema:
+        type: array
+        items:
+          type: string
+      description: The value can be repeated to retrieve multiple matching 
values (OR condition).
     FilterTags:
       in: query
       name: tags
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py 
b/airflow/api_connexion/schemas/task_instance_schema.py
index 04c2edc49c..8ef2987c88 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -65,6 +65,7 @@ class TaskInstanceSchema(SQLAlchemySchema):
     operator = auto_field()
     queued_dttm = auto_field(data_key="queued_when")
     pid = auto_field()
+    executor = auto_field()
     executor_config = auto_field()
     note = auto_field()
     sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
@@ -118,6 +119,7 @@ class TaskInstanceBatchFormSchema(Schema):
     state = fields.List(fields.Str(allow_none=True), load_default=None)
     pool = fields.List(fields.Str(), load_default=None)
     queue = fields.List(fields.Str(), load_default=None)
+    executor = fields.List(fields.Str(), load_default=None)
 
 
 class ClearTaskInstanceFormSchema(Schema):
diff --git a/airflow/www/static/js/dag/details/taskInstance/Details.tsx 
b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
index 80e26d79ca..17d5f13baa 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Details.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Details.tsx
@@ -53,6 +53,7 @@ const Details = ({ gridInstance, taskInstance, group }: 
Props) => {
   const endDate = gridInstance?.endDate || taskInstance?.endDate;
   const taskId = gridInstance?.taskId || taskInstance?.taskId;
   const mapIndex = gridInstance?.mapIndex || taskInstance?.mapIndex;
+  const executor = taskInstance?.executor || "<default>";
 
   const operator = group?.operator || taskInstance?.operator;
 
@@ -243,6 +244,12 @@ const Details = ({ gridInstance, taskInstance, group }: 
Props) => {
               <Td>{taskInstance.poolSlots}</Td>
             </Tr>
           )}
+          {executor && (
+            <Tr>
+              <Td>Executor</Td>
+              <Td>{executor}</Td>
+            </Tr>
+          )}
           {!!taskInstance?.executorConfig && (
             <Tr>
               <Td>Executor Config</Td>
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index a04c1cd0e0..1b82d07835 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -474,6 +474,8 @@ export interface paths {
         pool?: components["parameters"]["FilterPool"];
         /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
         queue?: components["parameters"]["FilterQueue"];
+        /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
+        executor?: components["parameters"]["FilterExecutor"];
       };
     };
   };
@@ -1497,6 +1499,12 @@ export interface components {
       /** @description The datetime that the task enter the state QUEUE, also 
known as queue_at */
       queued_when?: string | null;
       pid?: number | null;
+      /**
+       * @description Executor the task is configured to run on or None (which 
indicates the default executor)
+       *
+       * *New in version 2.10.0*
+       */
+      executor?: string | null;
       executor_config?: string;
       sla_miss?: components["schemas"]["SLAMiss"];
       /**
@@ -1688,6 +1696,7 @@ export interface components {
       wait_for_downstream?: boolean;
       retries?: number;
       queue?: string | null;
+      executor?: string | null;
       pool?: string;
       pool_slots?: number;
       execution_timeout?: components["schemas"]["TimeDelta"];
@@ -2239,6 +2248,8 @@ export interface components {
       pool?: string[];
       /** @description The value can be repeated to retrieve multiple matching 
values (OR condition). */
       queue?: string[];
+      /** @description The value can be repeated to retrieve multiple matching 
values (OR condition). */
+      executor?: string[];
     };
     /**
      * @description Schedule interval. Defines how often DAG runs, this object 
gets added to your latest task instance's
@@ -2567,6 +2578,8 @@ export interface components {
     FilterPool: string[];
     /** @description The value can be repeated to retrieve multiple matching 
values (OR condition). */
     FilterQueue: string[];
+    /** @description The value can be repeated to retrieve multiple matching 
values (OR condition). */
+    FilterExecutor: string[];
     /**
      * @description List of tags to filter results.
      *
@@ -3970,6 +3983,8 @@ export interface operations {
         pool?: components["parameters"]["FilterPool"];
         /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
         queue?: components["parameters"]["FilterQueue"];
+        /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
+        executor?: components["parameters"]["FilterExecutor"];
         /** The numbers of items to return. */
         limit?: components["parameters"]["PageLimit"];
         /** The number of items to skip before starting to collect the result 
set. */
@@ -4198,6 +4213,8 @@ export interface operations {
         pool?: components["parameters"]["FilterPool"];
         /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
         queue?: components["parameters"]["FilterQueue"];
+        /** The value can be repeated to retrieve multiple matching values (OR 
condition). */
+        executor?: components["parameters"]["FilterExecutor"];
         /**
          * The name of the field to order the results by.
          * Prefix a field name with `-` to reverse the sort order.
diff --git 
a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
index 8d5c854eb4..78054f379e 100644
--- a/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_mapped_task_instance_endpoint.py
@@ -98,7 +98,7 @@ class TestMappedTaskInstanceEndpoint:
             count = dag["success"] + dag["running"]
             with dag_maker(session=session, dag_id=dag_id, 
start_date=DEFAULT_DATETIME_1):
                 task1 = BaseOperator(task_id="op1")
-                mapped = 
MockOperator.partial(task_id="task_2").expand(arg2=task1.output)
+                mapped = MockOperator.partial(task_id="task_2", 
executor="default").expand(arg2=task1.output)
 
             dr = dag_maker.create_dagrun(run_id=f"run_{dag_id}")
 
@@ -221,6 +221,7 @@ class 
TestGetMappedTaskInstance(TestMappedTaskInstanceEndpoint):
             "duration": None,
             "end_date": None,
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": "default",
             "executor_config": "{}",
             "hostname": "",
             "map_index": 0,
@@ -453,6 +454,24 @@ class 
TestGetMappedTaskInstances(TestMappedTaskInstanceEndpoint):
         assert response.json["total_entries"] == 0
         assert response.json["task_instances"] == []
 
+    @provide_session
+    def test_mapped_task_instances_with_executor(self, 
one_task_with_mapped_tis, session):
+        response = self.client.get(
+            
"/api/v1/dags/mapped_tis/dagRuns/run_mapped_tis/taskInstances/task_2/listMapped?executor=default",
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 200
+        assert response.json["total_entries"] == 3
+        assert len(response.json["task_instances"]) == 3
+
+        response = self.client.get(
+            
"/api/v1/dags/mapped_tis/dagRuns/run_mapped_tis/taskInstances/task_2/listMapped?executor=no_exec",
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+        assert response.status_code == 200
+        assert response.json["total_entries"] == 0
+        assert response.json["task_instances"] == []
+
     @provide_session
     def test_mapped_task_instances_with_zero_mapped(self, 
one_task_with_zero_mapped_tis, session):
         response = self.client.get(
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index c3305e95b5..62ae45c1bd 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -227,6 +227,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -285,6 +286,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -332,6 +334,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -382,6 +385,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -442,6 +446,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
                 "duration": 10000.0,
                 "end_date": "2020-01-03T00:00:00+00:00",
                 "execution_date": "2020-01-01T00:00:00+00:00",
+                "executor": None,
                 "executor_config": "{}",
                 "hostname": "",
                 "map_index": map_index,
@@ -667,6 +672,31 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
                 2,
                 id="test queue filter ~",
             ),
+            pytest.param(
+                [
+                    {"executor": "test_exec_1"},
+                    {"executor": "test_exec_2"},
+                    {"executor": "test_exec_3"},
+                ],
+                True,
+                (
+                    
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID"
+                    "/taskInstances?executor=test_exec_1,test_exec_2"
+                ),
+                2,
+                id="test_executor_filter",
+            ),
+            pytest.param(
+                [
+                    {"executor": "test_exec_1"},
+                    {"executor": "test_exec_2"},
+                    {"executor": "test_exec_3"},
+                ],
+                True,
+                
"/api/v1/dags/~/dagRuns/~/taskInstances?executor=test_exec_1,test_exec_2",
+                2,
+                id="test executor filter ~",
+            ),
         ],
     )
     def test_should_respond_200(self, task_instances, update_extras, url, 
expected_ti, session):
@@ -769,6 +799,18 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                 "test",
                 id="test queue filter",
             ),
+            pytest.param(
+                [
+                    {"executor": "test_exec_1"},
+                    {"executor": "test_exec_2"},
+                    {"executor": "test_exec_3"},
+                ],
+                True,
+                {"executor": ["test_exec_1", "test_exec_2"]},
+                2,
+                "test",
+                id="test executor filter",
+            ),
             pytest.param(
                 [
                     {"pool": "test_pool_1"},
@@ -2367,6 +2409,7 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -2426,6 +2469,7 @@ class TestSetTaskInstanceNote(TestTaskInstanceEndpoint):
                 "duration": 10000.0,
                 "end_date": "2020-01-03T00:00:00+00:00",
                 "execution_date": "2020-01-01T00:00:00+00:00",
+                "executor": None,
                 "executor_config": "{}",
                 "hostname": "",
                 "map_index": map_index,
diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py 
b/tests/api_connexion/schemas/test_task_instance_schema.py
index c327e182fa..0ff10a7737 100644
--- a/tests/api_connexion/schemas/test_task_instance_schema.py
+++ b/tests/api_connexion/schemas/test_task_instance_schema.py
@@ -74,6 +74,7 @@ class TestTaskInstanceSchema:
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,
@@ -122,6 +123,7 @@ class TestTaskInstanceSchema:
             "duration": 10000.0,
             "end_date": "2020-01-03T00:00:00+00:00",
             "execution_date": "2020-01-01T00:00:00+00:00",
+            "executor": None,
             "executor_config": "{}",
             "hostname": "",
             "map_index": -1,

Reply via email to