sunank200 commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1682692872


##########
airflow/dag_processing/processor.py:
##########
@@ -796,8 +818,16 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, 
request: TaskCallbackRe
         if task:
             ti.refresh_from_task(task)
 
-        ti.handle_failure(error=request.msg, test_mode=self.UNIT_TEST_MODE, 
session=session)
-        self.log.info("Executed failure callback for %s in state %s", ti, 
ti.state)
+        if callback_type is TaskInstanceState.SUCCESS:
+            context = ti.get_template_context(session=session)
+            if not ti.task:
+                return
+            callbacks = ti.task.on_success_callback
+            _run_finished_callback(callbacks=callbacks, context=context)
+            self.log.info("Executed callback for %s in state %s", ti, ti.state)
+        elif not is_remote or callback_type is TaskInstanceState.FAILED:

Review Comment:
   In line 780, we are checking that `TaskCallbackRequest.is_failure_callback` 
is not a failure callback but here we are checking if the callback is submitted 
remotely or `callback_type` is `TaskInstanceState.FAILED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to