This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3189ebee18 Allow filtering event logs by attributes (#34417)
3189ebee18 is described below
commit 3189ebee181beecd5a1243a4998bc355b648dc6b
Author: Đỗ Trọng Hải <[email protected]>
AuthorDate: Wed Sep 27 14:58:38 2023 +0700
Allow filtering event logs by attributes (#34417)
Co-authored-by: Hussein Awala <[email protected]>
---
.../api_connexion/endpoints/event_log_endpoint.py | 22 +++++++++
airflow/api_connexion/openapi/v1.yaml | 56 ++++++++++++++++++++++
airflow/www/static/js/types/api-generated.ts | 24 ++++++++++
.../endpoints/test_event_log_endpoint.py | 41 ++++++++++++++++
4 files changed, 143 insertions(+)
diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py
b/airflow/api_connexion/endpoints/event_log_endpoint.py
index e195cfdcc2..99ec8eedae 100644
--- a/airflow/api_connexion/endpoints/event_log_endpoint.py
+++ b/airflow/api_connexion/endpoints/event_log_endpoint.py
@@ -30,9 +30,11 @@ from airflow.api_connexion.schemas.event_log_schema import (
)
from airflow.models import Log
from airflow.security import permissions
+from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
if TYPE_CHECKING:
+
from sqlalchemy.orm import Session
from airflow.api_connexion.types import APIResponse
@@ -53,6 +55,12 @@ def get_event_log(*, event_log_id: int, session: Session =
NEW_SESSION) -> APIRe
@provide_session
def get_event_logs(
*,
+ dag_id: str | None = None,
+ task_id: str | None = None,
+ owner: str | None = None,
+ event: str | None = None,
+ before: str | None = None,
+ after: str | None = None,
limit: int,
offset: int | None = None,
order_by: str = "event_log_id",
@@ -72,6 +80,20 @@ def get_event_logs(
]
total_entries = session.scalars(func.count(Log.id)).one()
query = select(Log)
+
+ if dag_id:
+ query = query.where(Log.dag_id == dag_id)
+ if task_id:
+ query = query.where(Log.task_id == task_id)
+ if owner:
+ query = query.where(Log.owner == owner)
+ if event:
+ query = query.where(Log.event == event)
+ if before:
+ query = query.where(Log.dttm < timezone.parse(before))
+ if after:
+ query = query.where(Log.dttm > timezone.parse(after))
+
query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs)
event_logs = session.scalars(query.offset(offset).limit(limit)).all()
return event_log_collection_schema.dump(
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index af9d0cf8a4..f76cfc7baa 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -983,6 +983,12 @@ paths:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
+ - $ref: '#/components/parameters/FilterDAGID'
+ - $ref: '#/components/parameters/FilterTaskID'
+ - $ref: '#/components/parameters/Event'
+ - $ref: '#/components/parameters/Owner'
+ - $ref: '#/components/parameters/Before'
+ - $ref: '#/components/parameters/After'
responses:
'200':
description: Success.
@@ -4810,6 +4816,40 @@ components:
required: true
description: The task ID.
+ Event:
+ in: query
+ name: event
+ schema:
+ type: string
+ required: false
+ description: The name of event log.
+
+ Owner:
+ in: query
+ name: owner
+ schema:
+ type: string
+ required: false
+ description: The owner's name of event log.
+
+ Before:
+ in: query
+ name: before
+ schema:
+ type: string
+ format: date-time
+ required: false
+ description: Timestamp to select event logs occurring before.
+
+ After:
+ in: query
+ name: after
+ schema:
+ type: string
+ format: date-time
+ required: false
+ description: Timestamp to select event logs occurring after.
+
MapIndex:
in: path
name: map_index
@@ -5147,6 +5187,22 @@ components:
required: false
description: Only filter the XCom records which have the provided key.
+ FilterDAGID:
+ in: query
+ name: dag_id
+ schema:
+ type: string
+ required: false
+ description: Returns objects matched by the DAG ID.
+
+ FilterTaskID:
+ in: query
+ name: task_id
+ schema:
+ type: string
+ required: false
+ description: Returns objects matched by the Task ID.
+
# Other parameters
FileToken:
in: path
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index 9b86f9c575..118804e290 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -2287,6 +2287,14 @@ export interface components {
DAGID: string;
/** @description The task ID. */
TaskID: string;
+ /** @description The name of event log. */
+ Event: string;
+ /** @description The owner's name of event log. */
+ Owner: string;
+ /** @description Timestamp to select event logs occurring before. */
+ Before: string;
+ /** @description Timestamp to select event logs occurring after. */
+ After: string;
/** @description The map index. */
MapIndex: number;
/** @description The DAG run ID. */
@@ -2424,6 +2432,10 @@ export interface components {
Paused: boolean;
/** @description Only filter the XCom records which have the provided key.
*/
FilterXcomKey: string;
+ /** @description Returns objects matched by the DAG ID. */
+ FilterDAGID: string;
+ /** @description Returns objects matched by the Task ID. */
+ FilterTaskID: string;
/**
* @description The key containing the encrypted path to the file.
Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This
also ensures API
@@ -3187,6 +3199,18 @@ export interface operations {
* *New in version 2.1.0*
*/
order_by?: components["parameters"]["OrderBy"];
+ /** Returns objects matched by the DAG ID. */
+ dag_id?: components["parameters"]["FilterDAGID"];
+ /** Returns objects matched by the Task ID. */
+ task_id?: components["parameters"]["FilterTaskID"];
+ /** The name of event log. */
+ event?: components["parameters"]["Event"];
+ /** The owner's name of event log. */
+ owner?: components["parameters"]["Owner"];
+ /** Timestamp to select event logs occurring before. */
+ before?: components["parameters"]["Before"];
+ /** Timestamp to select event logs occurring after. */
+ after?: components["parameters"]["After"];
};
};
responses: {
diff --git a/tests/api_connexion/endpoints/test_event_log_endpoint.py
b/tests/api_connexion/endpoints/test_event_log_endpoint.py
index ee1efba55b..ad154e57fe 100644
--- a/tests/api_connexion/endpoints/test_event_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_event_log_endpoint.py
@@ -233,6 +233,47 @@ class TestGetEventLogs(TestEventLogEndpoint):
assert_401(response)
+ def test_should_filter_eventlogs_by_allowed_attributes(self,
create_log_model, session):
+ eventlog1 = create_log_model(
+ event="TEST_EVENT_1",
+ dag_id="TEST_DAG_ID_1",
+ task_id="TEST_TASK_ID_1",
+ owner="TEST_OWNER_1",
+ when=self.default_time,
+ )
+ eventlog2 = create_log_model(
+ event="TEST_EVENT_2",
+ dag_id="TEST_DAG_ID_2",
+ task_id="TEST_TASK_ID_2",
+ owner="TEST_OWNER_2",
+ when=self.default_time_2,
+ )
+ session.add_all([eventlog1, eventlog2])
+ session.commit()
+ for attr in ["dag_id", "task_id", "owner", "event"]:
+ attr_value = f"TEST_{attr}_1".upper()
+ response = self.client.get(
+ f"/api/v1/eventLogs?{attr}={attr_value}",
environ_overrides={"REMOTE_USER": "test"}
+ )
+ assert response.status_code == 200
+ assert {eventlog[attr] for eventlog in
response.json["event_logs"]} == {attr_value}
+
+ def test_should_filter_eventlogs_by_when(self, create_log_model, session):
+ eventlog1 = create_log_model(event="TEST_EVENT_1",
when=self.default_time)
+ eventlog2 = create_log_model(event="TEST_EVENT_2",
when=self.default_time_2)
+ session.add_all([eventlog1, eventlog2])
+ session.commit()
+ for when_attr, expected_eventlogs in {
+ "before": {"TEST_EVENT_1"},
+ "after": {"TEST_EVENT_2"},
+ }.items():
+ response = self.client.get(
+
f"/api/v1/eventLogs?{when_attr}=2020-06-10T20%3A00%3A01%2B00%3A00", #
self.default_time + 1s
+ environ_overrides={"REMOTE_USER": "test"},
+ )
+ assert response.status_code == 200
+ assert {eventlog["event"] for eventlog in
response.json["event_logs"]} == expected_eventlogs
+
class TestGetEventLogPagination(TestEventLogEndpoint):
@pytest.mark.parametrize(