sunank200 commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1682831071


##########
airflow/dag_processing/processor.py:
##########
@@ -763,8 +763,29 @@ def _execute_dag_callbacks(self, dagbag: DagBag, request: 
DagCallbackRequest, se
         if callbacks and context:
             DAG.execute_callback(callbacks, context, dag.dag_id)
 
-    def _execute_task_callbacks(self, dagbag: DagBag | None, request: 
TaskCallbackRequest, session: Session):
-        if not request.is_failure_callback:
+    def _execute_task_callbacks(
+        self, dagbag: DagBag | None, request: TaskCallbackRequest, session: 
Session
+    ) -> None:
+        """
+        Execute the task callbacks.
+
+        :param dagbag: the DagBag to use to get the task instance
+        :param request: the task callback request
+        :param session: the session to use
+        """
+        try:
+            callback_type = TaskInstanceState(request.task_callback_type)
+        except ValueError:
+            callback_type = None
+        is_remote = callback_type in (TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED)
+
+        # previously we ignored any request besides failures. now if given 
callback type directly,
+        # then we respect it and execute it. additionally because in this 
scenario the callback
+        # is submitted remotely, we assume there is no need to mess with 
state; we simply run
+        # the callback
+
+        if not is_remote and not request.is_failure_callback:
+            self.log.debug("not failure callback: %s", request)

Review Comment:
   Yes. This is removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to