tirkarthi commented on issue #36090:
URL: https://github.com/apache/airflow/issues/36090#issuecomment-2053554297

   I just found out that there is already a cleanup method as part of the 
interface. The problem is that cleanup is called during triggerer restarts too 
as part of deployment due to which we don't want to cleanup like deleting 
remote jobs since the trigger will start tracking invalid jobs, So I thought of 
this change where there could be a marker attribute set only when trigger is 
cancelled as part of to_cancel loop and call my custom cleanup function. This 
helps with executing cleanup when tasks are cleared or marked success/failure. 
I tried using contextvar but for some reason the update from 
triggerer_job_runner is not propagated to the trigger. In Python 3.9 the msg 
parameter was added to `Task.cancel` with which custom messages can be 
propagated as part of `CancelledError` in Python 3.11 and above. This doesn't 
fully address the issue but thought to add my analysis in case if someone finds 
the patch/approach useful.
   
   
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/triggers/base/index.html#airflow.triggers.base.BaseTrigger.cleanup
   https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
   
   ```python
   class CustomTrigger(BaseTrigger):
   
       async def cleanup(self):
           """
           cleanup is called when trigger is cancelled by the triggerer which 
can happen as part of to_cancel loop
           and also when triggerer exits/restarts but we want to execute 
cleanup only when it's part of the cancel loop
           since triggerer could be restarted during deployment or marked as 
unhealthy due to which we don't want to
           do cleanup like deleting jobs tracked upstream which becomes an 
issue as the triggerer starts to track invalid jobs.
   
           If the trigger is cancelled from the to_cancel loop then the trigger 
is not present in the database
           with _cancelled_from_job_runner set as True with which the custom 
cleanup is executed.
           E.g. cleared task instance where trigger is cancelled
           """
           cancelled_from_runner = getattr(self, "_cancelled_from_job_runner", 
False)
   
           if cancelled_from_runner:
               self.custom_cleanup()
   
           await super().cleanup()
   
   ```
   
   ```patch
   diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
   index bb151b32cc..e80a910008 100644
   --- a/airflow/jobs/triggerer_job_runner.py
   +++ b/airflow/jobs/triggerer_job_runner.py
   @@ -497,6 +497,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                        "name": 
f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} "
                        f"(ID {trigger_id})",
                        "events": 0,
   +                    "trigger": trigger_instance
                    }
                else:
                    self.log.warning("Trigger %s had insertion attempted 
twice", trigger_id)
   @@ -512,6 +513,11 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                trigger_id = self.to_cancel.popleft()
                if trigger_id in self.triggers:
                    # We only delete if it did not exit already
   +                # These are tasks cancelled by triggerer since they are not 
found in the database
   +                # E.g. task instance cleared from UI. 
_cancelled_from_job_runner is set so that
   +                # our cleanup is executed only as needed and not during 
triggerer process shutdown
   +                # when cancel is called to call cleanup but this attribut 
is not present.
   +                
self.triggers[trigger_id]["trigger"]._cancelled_from_job_runner = True
                    self.triggers[trigger_id]["task"].cancel()
                await asyncio.sleep(0)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to