thesuperzapper commented on issue #36090:
URL: https://github.com/apache/airflow/issues/36090#issuecomment-2164294237

   @sunank200 @akaul my original solution proposed in 
https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 had a 
critical part missing, which means that the solution you implemented (in the 
BigQuery and DataProc operators) needs to be updated, along with the unmerged 
update to the Databricks operator (PR 
https://github.com/apache/airflow/pull/39373).
   
   The problem was that we would not correctly cancel the external job if the 
task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This 
is because if the task is cleared, the new job will likely end up DEFERRED 
before the `asyncio.CancelledError` is even thrown.
   
   I found a solution, which is to update the `safe_to_cancel()` method to also 
return True when the `job_id` of the current TaskInstance has changed since we 
were deferred (which only happens when the task is rescheduled because it was 
cleared). 
   
   For example, here is the updated  `safe_to_cancel()`  definition I am using:
   
   ```python
       def safe_to_cancel(self) -> bool:
           """
           Whether it is safe to cancel the external job which is being 
executed by this trigger.
   
           This is to avoid the case that `asyncio.CancelledError` is called 
because the trigger
           itself is stopped. Because in those cases, we should NOT cancel the 
external job.
           """
           # Database query is needed to get the latest state of the task 
instance.
           task_instance = self.get_task_instance()
   
           # If the current job_id is different from when the trigger was 
created,
           # then we should cancel the external job we are waiting on because 
the task has been
           # cleared and a new job has been created.
           if int(task_instance.job_id) != int(self.job_id):
               return True
   
           # If the task is not in a deferred state, then something else has 
happened to the task
           # since we were deferred (e.g. a manual state change), so we should 
cancel the external
           # job we are waiting on.
           return task_instance.state != TaskInstanceState.DEFERRED
   ```
   
   ---
   
   So people can do more testing, I have updated my reference example in 
https://github.com/apache/airflow/issues/36090#issuecomment-2094972855
   
   __NOTE:__ my reference is designed to work on all versions of airflow with 
deferable operators (e.g. 2.4.0+), but can be simplified if we require 2.6.0+ 
like you have in the upstream providers, see the "TODO" for more context.
   
   ---
   
   @potiuk we might want to consider implementing my workaround as a default 
method on the `BaseTrigger` and documenting it, or just explaining the 
workaround in the official docs about triggerers, because its pretty critical 
that users know that `on_kill` is NOT called by triggers when they are manually 
set as success/failure/clear, as this will result in external jobs not being 
stopped when users expect.
   
   We also need to update all other operators that currently define `on_kill` 
and support being deferred to use the workaround, here is a search that helps 
us find them: 
   
   - 
https://github.com/search?q=repo:apache/airflow+path:/%5Eairflow%5C/providers%5C//+on_kill(self)&type=code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to