tirkarthi commented on issue #36090: URL: https://github.com/apache/airflow/issues/36090#issuecomment-2053554297
I just found out that there is already a cleanup method as part of the interface. The problem is that cleanup is called during triggerer restarts too as part of deployment due to which we don't want to cleanup like deleting remote jobs since the trigger will start tracking invalid jobs, So I thought of this change where there could be a marker attribute set only when trigger is cancelled as part of to_cancel loop and call my custom cleanup function. This helps with executing cleanup when tasks are cleared or marked success/failure. I tried using contextvar but for some reason the update from triggerer_job_runner is not propagated to the trigger. In Python 3.9 the msg parameter was added to `Task.cancel` with which custom messages can be propagated as part of `CancelledError` in Python 3.11 and above. This doesn't fully address the issue but thought to add my analysis in case if someone finds the patch/approach useful. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/triggers/base/index.html#airflow.triggers.base.BaseTrigger.cleanup https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel ```python class CustomTrigger(BaseTrigger): async def cleanup(self): """ cleanup is called when trigger is cancelled by the triggerer which can happen as part of to_cancel loop and also when triggerer exits/restarts but we want to execute cleanup only when it's part of the cancel loop since triggerer could be restarted during deployment or marked as unhealthy due to which we don't want to do cleanup like deleting jobs tracked upstream which becomes an issue as the triggerer starts to track invalid jobs. If the trigger is cancelled from the to_cancel loop then the trigger is not present in the database with _cancelled_from_job_runner set as True with which the custom cleanup is executed. E.g. cleared task instance where trigger is cancelled """ cancelled_from_runner = getattr(self, "_cancelled_from_job_runner", False) if cancelled_from_runner: self.custom_cleanup() await super().cleanup() ``` ```patch diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py index bb151b32cc..e80a910008 100644 --- a/airflow/jobs/triggerer_job_runner.py +++ b/airflow/jobs/triggerer_job_runner.py @@ -497,6 +497,7 @@ class TriggerRunner(threading.Thread, LoggingMixin): "name": f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} " f"(ID {trigger_id})", "events": 0, + "trigger": trigger_instance } else: self.log.warning("Trigger %s had insertion attempted twice", trigger_id) @@ -512,6 +513,11 @@ class TriggerRunner(threading.Thread, LoggingMixin): trigger_id = self.to_cancel.popleft() if trigger_id in self.triggers: # We only delete if it did not exit already + # These are tasks cancelled by triggerer since they are not found in the database + # E.g. task instance cleared from UI. _cancelled_from_job_runner is set so that + # our cleanup is executed only as needed and not during triggerer process shutdown + # when cancel is called to call cleanup but this attribut is not present. + self.triggers[trigger_id]["trigger"]._cancelled_from_job_runner = True self.triggers[trigger_id]["task"].cancel() await asyncio.sleep(0) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
