Adaverse commented on issue #32379:
URL: https://github.com/apache/airflow/issues/32379#issuecomment-1637205100

   Did a small PoC on this, and it seems to work. 
   
   Created a new handler that saves only callback logs (in a directory 
structure like `dagRun_id/latest_callback_run` or we can have 
`dagRun_id/<attempt_number>` ) to a separate path from we can serve to the UI. 
So, the change apart from the handler part is that we direct all the logs from 
callbacks to the above handler and propagate them to StreamLogWriter.
   
   
https://github.com/apache/airflow/blob/09c5114f67f152c44b555ee408d76b26cfe0691f/airflow/models/dag.py#L1382-L1411
   
   So the above changes to - 
   
   ```
       def handle_callback(self, dagrun, file_path: str,success=True, 
reason=None, session=NEW_SESSION):
   
           ############ initiate logger and set context ############
           callback_logger = logging.getLogger("airflow.callback")
           set_context(callback_logger, file_path)
           ############ initiate logger and set context ############
           
           callbacks = self.on_success_callback if success else 
self.on_failure_callback
           if callbacks:
               callbacks = callbacks if isinstance(callbacks, list) else 
[callbacks]
               tis = dagrun.get_task_instances(session=session)
               ti = tis[-1]  # get first TaskInstance of DagRun
               ti.task = self.get_task(ti.task_id)
               context = ti.get_template_context(session=session)
               context.update({"reason": reason})
               for callback in callbacks:
   
                   ############ Redirect callback logs to "airflow.callback" 
handler ############
                   with redirect_stdout(StreamLogWriter(callback_logger, 
logging.INFO)), redirect_stderr(
                       StreamLogWriter(callback_logger, logging.WARN)
                   ):
                       try:
                           callback(context)
                       except Exception:
                           self.log.exception("failed to invoke dag state 
update callback")
                           Stats.incr("dag.callback_exceptions", 
tags={"dag_id": dagrun.dag_id})
   ```
   
   Wanted to get your thoughts on this @potiuk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to