ashb commented on code in PR #27113:
URL: https://github.com/apache/airflow/pull/27113#discussion_r1007040063


##########
airflow/config_templates/config.yml:
##########
@@ -2169,6 +2169,13 @@
       type: string
       example: ~
       default: "15"
+    - name: enable_dagrun_listener_notifications
+      description: |
+        Enable emitting dagrun listener notifications in scheduler.
+      version_added: 2.5.0
+      type: boolean
+      example: ~
+      default: "False"

Review Comment:
   Not sure we need a config setting for this really (Thinking: we already have 
so many config options, adding another one should be avoided unless we really 
need it)



##########
airflow/listeners/listener.py:
##########
@@ -47,6 +50,15 @@ def __init__(self):
     def has_listeners(self) -> bool:
         return len(self.pm.get_plugins()) > 0
 
+    @property
+    def has_scheduler_listeners(self) -> bool:
+        for plugin in self.pm.get_plugins():
+            if inspect.ismodule(plugin):

Review Comment:
   Can you explain what's going on here? Why do we need to check if its a 
module? Pluggy supports adding classes to I thought? (But mostly: why do we 
care?



##########
airflow/jobs/backfill_job.py:
##########
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import time
+from concurrent.futures import Executor, ThreadPoolExecutor

Review Comment:
   ```suggestion
   from concurrent.futures import Executor as FutureExecutor, ThreadPoolExecutor
   ```
   
   (and matching else where: to avoid confusing with Airflow's own Executor 
class



##########
airflow/listeners/listener.py:
##########
@@ -47,6 +50,15 @@ def __init__(self):
     def has_listeners(self) -> bool:
         return len(self.pm.get_plugins()) > 0
 
+    @property

Review Comment:
   ```suggestion
       @cached_property
   ```
   
   Once we've computed this once per process it can't change again.



##########
airflow/listeners/listener.py:
##########
@@ -33,6 +34,8 @@
 
 _listener_manager = None
 
+_scheduler_hooks = ["on_dag_run_success", "on_dag_run_failure"]

Review Comment:
   Given these hooks are also called from backfill:
   
   ```suggestion
   _dagrun_hooks = ["on_dag_run_success", "on_dag_run_failure"]
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -1568,3 +1590,21 @@ def _cleanup_stale_dags(self, session: Session = 
NEW_SESSION) -> None:
             dag.is_active = False
             SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
         session.flush()
+
+    def notify_dagrun_state_changed(self, dag_run: DagRun, msg: str = ""):
+        if not self.enabled_dagrun_listener or not 
get_listener_manager().has_scheduler_listeners:
+            return
+
+        if dag_run.state == DagRunState.RUNNING:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_start, dag_run=dag_run, 
msg=msg
+            )
+        elif dag_run.state == DagRunState.SUCCESS:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_success, 
dag_run=dag_run, msg=msg
+            )
+        elif dag_run.state == DagRunState.FAILED:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_failure, 
dag_run=dag_run, msg=msg
+            )

Review Comment:
   I'm not sure the threadpool belongs in here.
   
   If we make that the _plugin's_ responsibility then
   
   a) I don't think forcing a threadpool (of a fixed size, but that could be 
config driven) on users of this hook is required;
   b) We probably don't need `has_scheduler_listeners` anymore; 
   b) This PR becomes a lot smaller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to