vilozio opened a new issue, #39146:
URL: https://github.com/apache/airflow/issues/39146

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   We have our own small Airflow plugin to notify a slack channel when a DAG 
run fails. We use the [hookimpl 
Listeners](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/listeners.html).
   We had twice a strange issue when the plugin has gone to infinite loop and 
spammed the channel with lots of messages. We have to remove the plugin and 
restart Airflow instances to stop it. We didn't see any anomalies in logs 
except that scheduler detected a zombie job. So we think that something may be 
wrong with the way how we wrote the plugin, or with Airflow. 
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Unfortunately we don't know how to reproduce it, but this issue happened two 
times.
   
   ### Operating System
   
   Ubuntu 20.04.6 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   Composer version: `composer-2.5.1-airflow-2.6.3`
   Number of schedulers: 2
   
   
   ### Anything else?
   
   This is the log row that appeared near the time when the spam started, for 
this exact dag id.
   
   
   Detected zombie job: {'full_filepath': 
'/home/airflow/gcs/dags/myfolder/dag_player_value_model.py', 
'processor_subdir': '/home/airflow/gcs/dags', 'msg': "{'DAG Id': 
'player_value_model', 'Task Id': 'main', 'Run Id': 
'scheduled__2024-04-19T08:00:00+00:00', 'Hostname': 'airflow-worker-48ssg', 
'External Executor Id': '6c81b467-3ba0-430f-9fe1-fe48f3aca8a1'}", 
'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object 
at 0x7f4050d6e760>, 'is_failure_callback': True} 
   
   This is our plugin code
   
   <details>
   
   <summary>PLUGIN CODE</summary>
   
   ```python
   """Airflow plugin to notify channels on DagRun failure."""
   
   import logging
   from datetime import datetime
   from typing import List
   
   import requests
   from airflow import settings
   from airflow.listeners import hookimpl
   from airflow.models import DagModel, DagRun, DagTag, TaskInstance, Variable
   from airflow.plugins_manager import AirflowPlugin
   from airflow.utils.state import DagRunState, TaskInstanceState
   from sqlalchemy.orm import joinedload
   from sqlalchemy.orm.session import Session
   
   logger = logging.getLogger(__name__)
   
   
   # Tag to disable alert for a dag.
   NO_ALERT_TAG = "no alert"
   
   
   def _is_alert_enabled(dag_id: str, session: Session) -> bool:
       """Return True if the alert is enabled for the given dag id."""
       dag_tags: List[DagTag] = (
           session.query(DagModel)
           .options(joinedload(DagModel.tags, innerjoin=False))
           .filter(DagModel.dag_id == dag_id)
           .first()
           .tags
       )
       for tag in dag_tags:
           if tag.name.lower() == NO_ALERT_TAG:
               return False
       return True
   
   
   def notify_slack_on_dag(dag_run: DagRun, msg: str, session: Session) -> None:
       logger.info(
           f"Sending notification to slack channel... dag id {dag_run.dag_id} 
dag state {dag_run.state}"
       )
       now = datetime.utcnow()
       dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
   
       if not _is_alert_enabled(dag_run.dag_id, session):
           logger.info(
               "Alert is disabled for the dag. "
               "Skip sending notification to slack channel."
           )
           return
       # For some reason, variable is not accessible in scheduler.
       # So, we are using session to get the variable.
       variable = session.query(Variable).filter(Variable.key == 
"GCP_PROJECT_ID").first()
       project_id = variable.get_val()
   
       failed_tasks = (
           session.query(TaskInstance)
           .filter(
               TaskInstance.dag_run == dag_run,
               TaskInstance.state == TaskInstanceState.FAILED,
           )
           .all()
       )
   
       failed_tasks_str = "\n".join(
           f"{task.task_id} - {task.log_url}" for task in failed_tasks
       )
   
       request_variables = {
           "notificationType": "Airflow Task Fail",
           "projectId": project_id,
           "processName": dag_run.dag_id,
           "timeStamp": dt_string,
           "errorMessage": msg + "\n" + failed_tasks_str,
       }
       variable = (
           session.query(Variable)
           .filter(Variable.key == "url_notifications_to_slack_channels")
           .first()
       )
       url = variable.get_val()
       response = requests.post(
           url=url,
           json=request_variables,
       )
       logger.info(
           f"Response from slack channel is {response.status_code} 
{response.text}"
       )
   
   
   class NotifyOnFailurePlugin(AirflowPlugin):
       """Airflow plugin to notify channels on DagRun failure.
   
       For more information, see 
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
       """
   
       class Listener:
           @hookimpl
           def on_dag_run_failed(
               self,
               dag_run: DagRun,
               msg: str,
           ) -> None:
               if dag_run.state != DagRunState.FAILED:
                   return
               try:
                   session = settings.Session()
                   notify_slack_on_dag(dag_run, msg, session)
               except Exception as e:
                   logger.error(f"Error in on_dag_run_failed: {e}")
               finally:
                   session.close()
   
       # Name of the plugin.
       name = "NotifyOnFailurePlugin"
       # A list of Listeners that plugin provides. Listeners can register to
       # listen to particular events that happen in Airflow, like
       # TaskInstance state changes. Listeners are python classes or modules.
       listeners = [Listener()]
   
   ```
   </details>
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to