This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6afde788f58 dag run state change endpoints notify listeners about
state change (#45652)
6afde788f58 is described below
commit 6afde788f58a980507e93962a3c619d76981cdcf
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Wed Jan 15 17:24:34 2025 +0100
dag run state change endpoints notify listeners about state change (#45652)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
.../api_fastapi/core_api/routes/public/dag_run.py | 4 ++
.../core_api/routes/public/test_dag_run.py | 24 +++++++++++
tests/listeners/class_listener.py | 47 +++++++++++++++++++++-
3 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 9b8cfd6727b..cb846caaa2e 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -61,6 +61,7 @@ from airflow.api_fastapi.core_api.datamodels.task_instances
import (
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.exceptions import ParamValidationError
+from airflow.listeners.listener import get_listener_manager
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
@@ -159,10 +160,13 @@ def patch_dag_run(
attr_value = getattr(patch_body, "state")
if attr_value == DAGRunPatchStates.SUCCESS:
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
elif attr_value == DAGRunPatchStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+ # Not notifying on queued - only notifying on RUNNING, this is
happening in scheduler
elif attr_value == DAGRunPatchStates.FAILED:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id,
commit=True, session=session)
+ get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run,
msg="")
elif attr_name == "note":
# Once Authentication is implemented in this FastAPI app,
# user id will be added when updating dag run note
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 9b7c9211fdd..fc171150534 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -24,6 +24,7 @@ import pytest
import time_machine
from sqlalchemy import select
+from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.param import Param
@@ -943,6 +944,29 @@ class TestPatchDagRun:
body = response.json()
assert body["detail"][0]["msg"] == "Input should be 'queued',
'success' or 'failed'"
+ @pytest.fixture(autouse=True)
+ def clean_listener_manager(self):
+ get_listener_manager().clear()
+ yield
+ get_listener_manager().clear()
+
+ @pytest.mark.parametrize(
+ "state, listener_state",
+ [
+ ("queued", []),
+ ("success", [DagRunState.SUCCESS]),
+ ("failed", [DagRunState.FAILED]),
+ ],
+ )
+ def test_patch_dag_run_notifies_listeners(self, test_client, state,
listener_state):
+ from tests.listeners.class_listener import ClassBasedListener
+
+ listener = ClassBasedListener()
+ get_listener_manager().add_listener(listener)
+ response =
test_client.patch(f"/public/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}",
json={"state": state})
+ assert response.status_code == 200
+ assert listener.state == listener_state
+
class TestDeleteDagRun:
def test_delete_dag_run(self, test_client):
diff --git a/tests/listeners/class_listener.py
b/tests/listeners/class_listener.py
index 90ff6ab975e..b39f7278546 100644
--- a/tests/listeners/class_listener.py
+++ b/tests/listeners/class_listener.py
@@ -20,9 +20,9 @@ from __future__ import annotations
from airflow.listeners import hookimpl
from airflow.utils.state import DagRunState, TaskInstanceState
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS,
AIRFLOW_V_3_0_PLUS
-if AIRFLOW_V_2_10_PLUS:
+if AIRFLOW_V_3_0_PLUS:
class ClassBasedListener:
def __init__(self):
@@ -41,6 +41,49 @@ if AIRFLOW_V_2_10_PLUS:
stopped_component = component
self.state.append(DagRunState.SUCCESS)
+ @hookimpl
+ def on_task_instance_running(self, previous_state, task_instance):
+ self.state.append(TaskInstanceState.RUNNING)
+
+ @hookimpl
+ def on_task_instance_success(self, previous_state, task_instance):
+ self.state.append(TaskInstanceState.SUCCESS)
+
+ @hookimpl
+ def on_task_instance_failed(self, previous_state, task_instance,
error: None | str | BaseException):
+ self.state.append(TaskInstanceState.FAILED)
+
+ @hookimpl
+ def on_dag_run_running(self, dag_run, msg: str):
+ self.state.append(DagRunState.RUNNING)
+
+ @hookimpl
+ def on_dag_run_success(self, dag_run, msg: str):
+ self.state.append(DagRunState.SUCCESS)
+
+ @hookimpl
+ def on_dag_run_failed(self, dag_run, msg: str):
+ self.state.append(DagRunState.FAILED)
+
+elif AIRFLOW_V_2_10_PLUS:
+
+ class ClassBasedListener: # type: ignore[no-redef]
+ def __init__(self):
+ self.started_component = None
+ self.stopped_component = None
+ self.state = []
+
+ @hookimpl
+ def on_starting(self, component):
+ self.started_component = component
+ self.state.append(DagRunState.RUNNING)
+
+ @hookimpl
+ def before_stopping(self, component):
+ global stopped_component
+ stopped_component = component
+ self.state.append(DagRunState.SUCCESS)
+
@hookimpl
def on_task_instance_running(self, previous_state, task_instance,
session):
self.state.append(TaskInstanceState.RUNNING)