thesuperzapper commented on issue #36090: URL: https://github.com/apache/airflow/issues/36090#issuecomment-2164294237
@sunank200 @akaul my original solution proposed in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 had a critical part missing, which means that the solution you implemented (in the BigQuery and DataProc operators) needs to be updated, along with the unmerged update to the Databricks operator (PR https://github.com/apache/airflow/pull/39373). The problem was that we would not correctly cancel the external job if the task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This is because if the task is cleared, the new job will likely end up DEFERRED before the `asyncio.CancelledError` is even thrown. I found a solution, which is to update the `safe_to_cancel()` method to also return True when the `job_id` of the current TaskInstance has changed since we were deferred (which only happens when the task is rescheduled because it was cleared). For example, here is the updated `safe_to_cancel()` definition I am using: ```python def safe_to_cancel(self) -> bool: """ Whether it is safe to cancel the external job which is being executed by this trigger. This is to avoid the case that `asyncio.CancelledError` is called because the trigger itself is stopped. Because in those cases, we should NOT cancel the external job. """ # Database query is needed to get the latest state of the task instance. task_instance = self.get_task_instance() # If the current job_id is different from when the trigger was created, # then we should cancel the external job we are waiting on because the task has been # cleared and a new job has been created. if int(task_instance.job_id) != int(self.job_id): return True # If the task is not in a deferred state, then something else has happened to the task # since we were deferred (e.g. a manual state change), so we should cancel the external # job we are waiting on. return task_instance.state != TaskInstanceState.DEFERRED ``` --- So people can do more testing, I have updated my reference example in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855 __NOTE:__ my reference is designed to work on all versions of airflow with deferable operators (e.g. 2.4.0+), but can be simplified if we require 2.6.0+ like you have in the upstream providers, see the "TODO" for more context. --- @potiuk we might want to consider implementing my workaround as a default method on the `BaseTrigger` and documenting it, or just explaining the workaround in the official docs about triggerers, because its pretty critical that users know that `on_kill` is NOT called by triggers when they are manually set as success/failure/clear, as this will result in external jobs not being stopped when users expect. We also need to update all other operators that currently define `on_kill` and support being deferred to use the workaround, here is a search that helps us find them: - https://github.com/search?q=repo:apache/airflow+path:/%5Eairflow%5C/providers%5C//+on_kill(self)&type=code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
