This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 08b1e8d749 add dag_run_ids and task_ids filter for the batch task
instance API endpoint (#32705)
08b1e8d749 is described below
commit 08b1e8d749612af469625add5c7f0ad582969c39
Author: Burak Karakan <[email protected]>
AuthorDate: Mon Aug 7 11:35:36 2023 +0300
add dag_run_ids and task_ids filter for the batch task instance API
endpoint (#32705)
* add dag_run_ids and task_ids as filter types for the batch task instance
endpoint
* add version notice to the new filters
* Update the released version in OpenAPI spec
---------
Co-authored-by: pierrejeambrun <[email protected]>
---
.../endpoints/task_instance_endpoint.py | 2 ++
airflow/api_connexion/openapi/v1.yaml | 20 +++++++++++++++
.../api_connexion/schemas/task_instance_schema.py | 2 ++
airflow/www/static/js/types/api-generated.ts | 12 +++++++++
.../endpoints/test_task_instance_endpoint.py | 30 ++++++++++++++++++++++
5 files changed, 66 insertions(+)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index ab4b24f8f2..a18514696b 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -400,6 +400,8 @@ def get_task_instances_batch(session: Session =
NEW_SESSION) -> APIResponse:
base_query = select(TI).join(TI.dag_run)
base_query = _apply_array_filter(base_query, key=TI.dag_id,
values=data["dag_ids"])
+ base_query = _apply_array_filter(base_query, key=TI.run_id,
values=data["dag_run_ids"])
+ base_query = _apply_array_filter(base_query, key=TI.task_id,
values=data["task_ids"])
base_query = _apply_range_filter(
base_query,
key=DR.execution_date,
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index b3b4a0f5f0..da7dca04f6 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -4410,6 +4410,26 @@ components:
Return objects with specific DAG IDs.
The value can be repeated to retrieve multiple matching values (OR
condition).
+ dag_run_ids:
+ type: array
+ items:
+ type: string
+ description:
+ Return objects with specific DAG Run IDs.
+
+ The value can be repeated to retrieve multiple matching values (OR
condition).
+
+ *New in version 2.7.1*
+ task_ids:
+ type: array
+ items:
+ type: string
+ description:
+ Return objects with specific task IDs.
+
+ The value can be repeated to retrieve multiple matching values (OR
condition).
+
+ *New in version 2.7.1*
execution_date_gte:
type: string
format: date-time
diff --git a/airflow/api_connexion/schemas/task_instance_schema.py
b/airflow/api_connexion/schemas/task_instance_schema.py
index c929da3099..a3ce7c6a62 100644
--- a/airflow/api_connexion/schemas/task_instance_schema.py
+++ b/airflow/api_connexion/schemas/task_instance_schema.py
@@ -100,6 +100,8 @@ class TaskInstanceBatchFormSchema(Schema):
page_offset = fields.Int(load_default=0, validate=validate.Range(min=0))
page_limit = fields.Int(load_default=100, validate=validate.Range(min=1))
dag_ids = fields.List(fields.Str(), load_default=None)
+ dag_run_ids = fields.List(fields.Str(), load_default=None)
+ task_ids = fields.List(fields.Str(), load_default=None)
execution_date_gte = fields.DateTime(load_default=None,
validate=validate_istimezone)
execution_date_lte = fields.DateTime(load_default=None,
validate=validate_istimezone)
start_date_gte = fields.DateTime(load_default=None,
validate=validate_istimezone)
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index f791db4385..e2d38d0725 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -1983,6 +1983,18 @@ export interface components {
* The value can be repeated to retrieve multiple matching values (OR
condition).
*/
dag_ids?: string[];
+ /**
+ * @description Return objects with specific DAG Run IDs.
+ * The value can be repeated to retrieve multiple matching values (OR
condition).
+ * *New in version 2.7.1*
+ */
+ dag_run_ids?: string[];
+ /**
+ * @description Return objects with specific task IDs.
+ * The value can be repeated to retrieve multiple matching values (OR
condition).
+ * *New in version 2.7.1*
+ */
+ task_ids?: string[];
/**
* Format: date-time
* @description Returns objects greater or equal to the specified date.
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index a8c4c8b10a..f39fad6d6e 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -778,6 +778,36 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"test",
id="with execution date filter",
),
+ pytest.param(
+ [
+ {"execution_date": DEFAULT_DATETIME_1},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1)},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2)},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=3)},
+ ],
+ False,
+ {
+ "dag_run_ids": ["TEST_DAG_RUN_ID_0", "TEST_DAG_RUN_ID_1"],
+ },
+ 2,
+ "test",
+ id="test dag run id filter",
+ ),
+ pytest.param(
+ [
+ {"execution_date": DEFAULT_DATETIME_1},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=1)},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=2)},
+ {"execution_date": DEFAULT_DATETIME_1 +
dt.timedelta(days=3)},
+ ],
+ False,
+ {
+ "task_ids": ["print_the_context", "log_sql_query"],
+ },
+ 2,
+ "test",
+ id="test task id filter",
+ ),
],
)
def test_should_respond_200(