Adaverse commented on issue #32379: URL: https://github.com/apache/airflow/issues/32379#issuecomment-1637205100
Did a small PoC on this, and it seems to work. Created a new handler that saves only callback logs (in a directory structure like `dagRun_id/latest_callback_run` or we can have `dagRun_id/<attempt_number>` ) to a separate path from we can serve to the UI. So, the change apart from the handler part is that we direct all the logs from callbacks to the above handler and propagate them to StreamLogWriter. https://github.com/apache/airflow/blob/09c5114f67f152c44b555ee408d76b26cfe0691f/airflow/models/dag.py#L1382-L1411 So the above changes to - ``` def handle_callback(self, dagrun, file_path: str,success=True, reason=None, session=NEW_SESSION): ############ initiate logger and set context ############ callback_logger = logging.getLogger("airflow.callback") set_context(callback_logger, file_path) ############ initiate logger and set context ############ callbacks = self.on_success_callback if success else self.on_failure_callback if callbacks: callbacks = callbacks if isinstance(callbacks, list) else [callbacks] tis = dagrun.get_task_instances(session=session) ti = tis[-1] # get first TaskInstance of DagRun ti.task = self.get_task(ti.task_id) context = ti.get_template_context(session=session) context.update({"reason": reason}) for callback in callbacks: ############ Redirect callback logs to "airflow.callback" handler ############ with redirect_stdout(StreamLogWriter(callback_logger, logging.INFO)), redirect_stderr( StreamLogWriter(callback_logger, logging.WARN) ): try: callback(context) except Exception: self.log.exception("failed to invoke dag state update callback") Stats.incr("dag.callback_exceptions", tags={"dag_id": dagrun.dag_id}) ``` Wanted to get your thoughts on this @potiuk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
