vilozio opened a new issue, #39146: URL: https://github.com/apache/airflow/issues/39146
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.6.3 ### What happened? We have our own small Airflow plugin to notify a slack channel when a DAG run fails. We use the [hookimpl Listeners](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/listeners.html). We had twice a strange issue when the plugin has gone to infinite loop and spammed the channel with lots of messages. We have to remove the plugin and restart Airflow instances to stop it. We didn't see any anomalies in logs except that scheduler detected a zombie job. So we think that something may be wrong with the way how we wrote the plugin, or with Airflow. ### What you think should happen instead? _No response_ ### How to reproduce Unfortunately we don't know how to reproduce it, but this issue happened two times. ### Operating System Ubuntu 20.04.6 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Google Cloud Composer ### Deployment details Composer version: `composer-2.5.1-airflow-2.6.3` Number of schedulers: 2 ### Anything else? This is the log row that appeared near the time when the spam started, for this exact dag id. Detected zombie job: {'full_filepath': '/home/airflow/gcs/dags/myfolder/dag_player_value_model.py', 'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 'player_value_model', 'Task Id': 'main', 'Run Id': 'scheduled__2024-04-19T08:00:00+00:00', 'Hostname': 'airflow-worker-48ssg', 'External Executor Id': '6c81b467-3ba0-430f-9fe1-fe48f3aca8a1'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f4050d6e760>, 'is_failure_callback': True} This is our plugin code <details> <summary>PLUGIN CODE</summary> ```python """Airflow plugin to notify channels on DagRun failure.""" import logging from datetime import datetime from typing import List import requests from airflow import settings from airflow.listeners import hookimpl from airflow.models import DagModel, DagRun, DagTag, TaskInstance, Variable from airflow.plugins_manager import AirflowPlugin from airflow.utils.state import DagRunState, TaskInstanceState from sqlalchemy.orm import joinedload from sqlalchemy.orm.session import Session logger = logging.getLogger(__name__) # Tag to disable alert for a dag. NO_ALERT_TAG = "no alert" def _is_alert_enabled(dag_id: str, session: Session) -> bool: """Return True if the alert is enabled for the given dag id.""" dag_tags: List[DagTag] = ( session.query(DagModel) .options(joinedload(DagModel.tags, innerjoin=False)) .filter(DagModel.dag_id == dag_id) .first() .tags ) for tag in dag_tags: if tag.name.lower() == NO_ALERT_TAG: return False return True def notify_slack_on_dag(dag_run: DagRun, msg: str, session: Session) -> None: logger.info( f"Sending notification to slack channel... dag id {dag_run.dag_id} dag state {dag_run.state}" ) now = datetime.utcnow() dt_string = now.strftime("%d/%m/%Y %H:%M:%S") if not _is_alert_enabled(dag_run.dag_id, session): logger.info( "Alert is disabled for the dag. " "Skip sending notification to slack channel." ) return # For some reason, variable is not accessible in scheduler. # So, we are using session to get the variable. variable = session.query(Variable).filter(Variable.key == "GCP_PROJECT_ID").first() project_id = variable.get_val() failed_tasks = ( session.query(TaskInstance) .filter( TaskInstance.dag_run == dag_run, TaskInstance.state == TaskInstanceState.FAILED, ) .all() ) failed_tasks_str = "\n".join( f"{task.task_id} - {task.log_url}" for task in failed_tasks ) request_variables = { "notificationType": "Airflow Task Fail", "projectId": project_id, "processName": dag_run.dag_id, "timeStamp": dt_string, "errorMessage": msg + "\n" + failed_tasks_str, } variable = ( session.query(Variable) .filter(Variable.key == "url_notifications_to_slack_channels") .first() ) url = variable.get_val() response = requests.post( url=url, json=request_variables, ) logger.info( f"Response from slack channel is {response.status_code} {response.text}" ) class NotifyOnFailurePlugin(AirflowPlugin): """Airflow plugin to notify channels on DagRun failure. For more information, see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html """ class Listener: @hookimpl def on_dag_run_failed( self, dag_run: DagRun, msg: str, ) -> None: if dag_run.state != DagRunState.FAILED: return try: session = settings.Session() notify_slack_on_dag(dag_run, msg, session) except Exception as e: logger.error(f"Error in on_dag_run_failed: {e}") finally: session.close() # Name of the plugin. name = "NotifyOnFailurePlugin" # A list of Listeners that plugin provides. Listeners can register to # listen to particular events that happen in Airflow, like # TaskInstance state changes. Listeners are python classes or modules. listeners = [Listener()] ``` </details> ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
