mobuchowski commented on code in PR #27113:
URL: https://github.com/apache/airflow/pull/27113#discussion_r1015305683


##########
airflow/jobs/scheduler_job.py:
##########
@@ -1568,3 +1590,21 @@ def _cleanup_stale_dags(self, session: Session = 
NEW_SESSION) -> None:
             dag.is_active = False
             SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
         session.flush()
+
+    def notify_dagrun_state_changed(self, dag_run: DagRun, msg: str = ""):
+        if not self.enabled_dagrun_listener or not 
get_listener_manager().has_scheduler_listeners:
+            return
+
+        if dag_run.state == DagRunState.RUNNING:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_start, dag_run=dag_run, 
msg=msg
+            )
+        elif dag_run.state == DagRunState.SUCCESS:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_success, 
dag_run=dag_run, msg=msg
+            )
+        elif dag_run.state == DagRunState.FAILED:
+            self._notification_threadpool.submit(  # type: ignore[union-attr]
+                get_listener_manager().hook.on_dag_run_failure, 
dag_run=dag_run, msg=msg
+            )

Review Comment:
   Removed threadpool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to