ChrnvaN opened a new issue, #39646:
URL: https://github.com/apache/airflow/issues/39646

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.1
   
   ### What happened?
   
   I wrote a listener plugin and added it to the plugins directory. The 
listener includes the methods:
   - on_task_instance_running
   - on_task_instance_success
   - on_task_instance_failed
   - on_dag_run_running
   - on_dag_run_success
   - on_dag_run_failed. 
   When trying to extract values from Variables and Connections in the 
on_dag_run_running, on_dag_run_success, on_dag_run_failed methods, an error 
occurs already when running dag run:
   ```
   Traceback (most recent call last):
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in _run_scheduler_job
   
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 79, in wrapper
   
       return func(*args, session=session, **kwargs)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 
393, in run_job
   
       return execute_job(job, execute_callable=execute_callable)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/job.py", line 
422, in execute_job
   
       ret = execute_callable()
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 855, in _execute
   
       self._run_scheduler_loop()
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 987, in _run_scheduler_loop
   
       num_queued_tis = self._do_scheduling(session)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1063, in _do_scheduling
   
       self._start_queued_dagruns(session)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1404, in _start_queued_dagruns
   
       dag_run.notify_dagrun_state_changed()
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 862, in notify_dagrun_state_changed
   
       get_listener_manager().hook.on_dag_run_running(dag_run=self, msg=msg)
   
     File "/home/airflow/.local/lib/python3.9/site-packages/pluggy/_hooks.py", 
line 493, in __call__
   
       return self._hookexec(self.name, self._hookimpls, kwargs, firstresult)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/pluggy/_manager.py", line 
115, in _hookexec
   
       return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 
113, in _multicall
   
       raise exception.with_traceback(exception.__traceback__)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/pluggy/_callers.py", line 77, 
in _multicall
   
       res = hook_impl.function(*args)
   
     File "/opt/airflow/plugins/metadata/airflow_metadata.py", line 206, in 
on_dag_run_running
   
       my_connection = BaseHook.get_connection("my_connection")
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/hooks/base.py", line 
82, in get_connection
   
       conn = Connection.get_connection_from_secrets(conn_id)
   
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/connection.py",
 line 479, in get_connection_from_secrets
   
       raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   
   airflow.exceptions.AirflowNotFoundException: The conn_id `my_connection` 
isn't defined
   ```
   
   Although the connection is explicitly defined in Connections and everything 
works in the case of listening to tasks.
   
   How can I fix it so that if I listen to Dog Run, everything works too?
   
   
   ### What you think should happen instead?
   
   I expect the values to be pulled up from Connection and Variable when 
listening to DagRun.
   
   ### How to reproduce
   
   Add to Connection "my_connection" and add to Variable "environment" 
   
   ```
   from airflow.listeners import hookimpl
   from airflow.models.taskinstance import TaskInstance
   from airflow.hooks.base import BaseHook
   from airflow.models import Variable
   from airflow.models.dagrun import DagRun
   from airflow.plugins_manager import AirflowPlugin
   
   
   class AirflowListener:
       @hookimpl
       def on_task_instance_running(self, task_instance: TaskInstance) -> None:
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
   
       @hookimpl
       def on_task_instance_success(self, task_instance: TaskInstance) -> None:
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
   
       @hookimpl
       def on_task_instance_failed(self, task_instance: TaskInstance) -> None:
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
   
       @hookimpl
       def on_dag_run_running(self, dag_run: DagRun):
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
   
       @hookimpl
       def on_dag_run_success(self, dag_run: DagRun):
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
   
       @hookimpl
       def on_dag_run_failed(self, dag_run: DagRun):
           my_connection = BaseHook.get_connection("my_connection")
           env = Variable.get("environment")
           
           
   class AirflowListenerPlugin(AirflowPlugin):
       name = "AirflowListener"
       listeners = [AirflowListener()]
   
   ```
   
   ### Operating System
   
   macOS Sonoma 14.1.2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
    apache/airflow:2.8.1-python3.9
    executor: CeleryExecutor
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to