This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3189ebee18 Allow filtering event logs by attributes (#34417)
3189ebee18 is described below

commit 3189ebee181beecd5a1243a4998bc355b648dc6b
Author: Đỗ Trọng Hải <[email protected]>
AuthorDate: Wed Sep 27 14:58:38 2023 +0700

    Allow filtering event logs by attributes (#34417)
    
    Co-authored-by: Hussein Awala <[email protected]>
---
 .../api_connexion/endpoints/event_log_endpoint.py  | 22 +++++++++
 airflow/api_connexion/openapi/v1.yaml              | 56 ++++++++++++++++++++++
 airflow/www/static/js/types/api-generated.ts       | 24 ++++++++++
 .../endpoints/test_event_log_endpoint.py           | 41 ++++++++++++++++
 4 files changed, 143 insertions(+)

diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py 
b/airflow/api_connexion/endpoints/event_log_endpoint.py
index e195cfdcc2..99ec8eedae 100644
--- a/airflow/api_connexion/endpoints/event_log_endpoint.py
+++ b/airflow/api_connexion/endpoints/event_log_endpoint.py
@@ -30,9 +30,11 @@ from airflow.api_connexion.schemas.event_log_schema import (
 )
 from airflow.models import Log
 from airflow.security import permissions
+from airflow.utils import timezone
 from airflow.utils.session import NEW_SESSION, provide_session
 
 if TYPE_CHECKING:
+
     from sqlalchemy.orm import Session
 
     from airflow.api_connexion.types import APIResponse
@@ -53,6 +55,12 @@ def get_event_log(*, event_log_id: int, session: Session = 
NEW_SESSION) -> APIRe
 @provide_session
 def get_event_logs(
     *,
+    dag_id: str | None = None,
+    task_id: str | None = None,
+    owner: str | None = None,
+    event: str | None = None,
+    before: str | None = None,
+    after: str | None = None,
     limit: int,
     offset: int | None = None,
     order_by: str = "event_log_id",
@@ -72,6 +80,20 @@ def get_event_logs(
     ]
     total_entries = session.scalars(func.count(Log.id)).one()
     query = select(Log)
+
+    if dag_id:
+        query = query.where(Log.dag_id == dag_id)
+    if task_id:
+        query = query.where(Log.task_id == task_id)
+    if owner:
+        query = query.where(Log.owner == owner)
+    if event:
+        query = query.where(Log.event == event)
+    if before:
+        query = query.where(Log.dttm < timezone.parse(before))
+    if after:
+        query = query.where(Log.dttm > timezone.parse(after))
+
     query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs)
     event_logs = session.scalars(query.offset(offset).limit(limit)).all()
     return event_log_collection_schema.dump(
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index af9d0cf8a4..f76cfc7baa 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -983,6 +983,12 @@ paths:
         - $ref: '#/components/parameters/PageLimit'
         - $ref: '#/components/parameters/PageOffset'
         - $ref: '#/components/parameters/OrderBy'
+        - $ref: '#/components/parameters/FilterDAGID'
+        - $ref: '#/components/parameters/FilterTaskID'
+        - $ref: '#/components/parameters/Event'
+        - $ref: '#/components/parameters/Owner'
+        - $ref: '#/components/parameters/Before'
+        - $ref: '#/components/parameters/After'
       responses:
         '200':
           description: Success.
@@ -4810,6 +4816,40 @@ components:
       required: true
       description: The task ID.
 
+    Event:
+      in: query
+      name: event
+      schema:
+        type: string
+      required: false
+      description: The name of event log.
+
+    Owner:
+      in: query
+      name: owner
+      schema:
+        type: string
+      required: false
+      description: The owner's name of event log.
+
+    Before:
+      in: query
+      name: before
+      schema:
+        type: string
+        format: date-time
+      required: false
+      description: Timestamp to select event logs occurring before.
+
+    After:
+      in: query
+      name: after
+      schema:
+        type: string
+        format: date-time
+      required: false
+      description: Timestamp to select event logs occurring after.
+
     MapIndex:
       in: path
       name: map_index
@@ -5147,6 +5187,22 @@ components:
       required: false
       description: Only filter the XCom records which have the provided key.
 
+    FilterDAGID:
+      in: query
+      name: dag_id
+      schema:
+        type: string
+      required: false
+      description: Returns objects matched by the DAG ID.
+
+    FilterTaskID:
+      in: query
+      name: task_id
+      schema:
+        type: string
+      required: false
+      description: Returns objects matched by the Task ID.
+
     # Other parameters
     FileToken:
       in: path
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 9b86f9c575..118804e290 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -2287,6 +2287,14 @@ export interface components {
     DAGID: string;
     /** @description The task ID. */
     TaskID: string;
+    /** @description The name of event log. */
+    Event: string;
+    /** @description The owner's name of event log. */
+    Owner: string;
+    /** @description Timestamp to select event logs occurring before. */
+    Before: string;
+    /** @description Timestamp to select event logs occurring after. */
+    After: string;
     /** @description The map index. */
     MapIndex: number;
     /** @description The DAG run ID. */
@@ -2424,6 +2432,10 @@ export interface components {
     Paused: boolean;
     /** @description Only filter the XCom records which have the provided key. 
*/
     FilterXcomKey: string;
+    /** @description Returns objects matched by the DAG ID. */
+    FilterDAGID: string;
+    /** @description Returns objects matched by the Task ID. */
+    FilterTaskID: string;
     /**
      * @description The key containing the encrypted path to the file. 
Encryption and decryption take place only on
      * the server. This prevents the client from reading an non-DAG file. This 
also ensures API
@@ -3187,6 +3199,18 @@ export interface operations {
          * *New in version 2.1.0*
          */
         order_by?: components["parameters"]["OrderBy"];
+        /** Returns objects matched by the DAG ID. */
+        dag_id?: components["parameters"]["FilterDAGID"];
+        /** Returns objects matched by the Task ID. */
+        task_id?: components["parameters"]["FilterTaskID"];
+        /** The name of event log. */
+        event?: components["parameters"]["Event"];
+        /** The owner's name of event log. */
+        owner?: components["parameters"]["Owner"];
+        /** Timestamp to select event logs occurring before. */
+        before?: components["parameters"]["Before"];
+        /** Timestamp to select event logs occurring after. */
+        after?: components["parameters"]["After"];
       };
     };
     responses: {
diff --git a/tests/api_connexion/endpoints/test_event_log_endpoint.py 
b/tests/api_connexion/endpoints/test_event_log_endpoint.py
index ee1efba55b..ad154e57fe 100644
--- a/tests/api_connexion/endpoints/test_event_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_event_log_endpoint.py
@@ -233,6 +233,47 @@ class TestGetEventLogs(TestEventLogEndpoint):
 
         assert_401(response)
 
+    def test_should_filter_eventlogs_by_allowed_attributes(self, 
create_log_model, session):
+        eventlog1 = create_log_model(
+            event="TEST_EVENT_1",
+            dag_id="TEST_DAG_ID_1",
+            task_id="TEST_TASK_ID_1",
+            owner="TEST_OWNER_1",
+            when=self.default_time,
+        )
+        eventlog2 = create_log_model(
+            event="TEST_EVENT_2",
+            dag_id="TEST_DAG_ID_2",
+            task_id="TEST_TASK_ID_2",
+            owner="TEST_OWNER_2",
+            when=self.default_time_2,
+        )
+        session.add_all([eventlog1, eventlog2])
+        session.commit()
+        for attr in ["dag_id", "task_id", "owner", "event"]:
+            attr_value = f"TEST_{attr}_1".upper()
+            response = self.client.get(
+                f"/api/v1/eventLogs?{attr}={attr_value}", 
environ_overrides={"REMOTE_USER": "test"}
+            )
+            assert response.status_code == 200
+            assert {eventlog[attr] for eventlog in 
response.json["event_logs"]} == {attr_value}
+
+    def test_should_filter_eventlogs_by_when(self, create_log_model, session):
+        eventlog1 = create_log_model(event="TEST_EVENT_1", 
when=self.default_time)
+        eventlog2 = create_log_model(event="TEST_EVENT_2", 
when=self.default_time_2)
+        session.add_all([eventlog1, eventlog2])
+        session.commit()
+        for when_attr, expected_eventlogs in {
+            "before": {"TEST_EVENT_1"},
+            "after": {"TEST_EVENT_2"},
+        }.items():
+            response = self.client.get(
+                
f"/api/v1/eventLogs?{when_attr}=2020-06-10T20%3A00%3A01%2B00%3A00",  # 
self.default_time + 1s
+                environ_overrides={"REMOTE_USER": "test"},
+            )
+            assert response.status_code == 200
+            assert {eventlog["event"] for eventlog in 
response.json["event_logs"]} == expected_eventlogs
+
 
 class TestGetEventLogPagination(TestEventLogEndpoint):
     @pytest.mark.parametrize(

Reply via email to