dima-asana commented on PR #23432: URL: https://github.com/apache/airflow/pull/23432#issuecomment-1123083198
First, thank you so much for working on this. We’ve run into a very similar issue and are eagerly looking forward to a fix. We also now have an isolated environment where we can reproduce this, so I’d be happy to test some version of this patch if you’d like (may have turnarounds of a week or so, sorry in advance). I went ahead and tested your current code with our setup and it works perfectly for us - the stuck task is revoked in Celery so there is no chance of it running twice and sent back to the scheduler. > I'm far from a celery expert but as far as I can tell celery does not provide any capability to do this I’m also not a celery expert, but I believe tasks lost due to a worker shutdown like https://github.com/celery/celery/issues/7266 have already gone from the broker queue to the celery worker consumer. This means that you would be able to find the tasks on celery rather than on the broker. You might try to look at `app.control.inspect().reserved` for tasks that got reserved but didn’t make it to a worker process. Unfortunately I don’t think it’s possible to differentiate these tasks with “legitimately reserved” tasks. That said, the default airflow configuration has a very short lifespan for legitimately reserved tasks – with `prefetch_multiplier` as 1, `acks_late` as True, and the `fair` scheduling strategy, tasks are only reserved for workers that can “immediately” consume them (in practice for us tasks are legitimately reserved for ~0.01s) We experience this behavior due to a [different bug](https://github.com/celery/celery/issues/7515) in Celery. You can see a longer description of our experience [here](https://docs.google.com/document/d/1Peox1M7PUJQ4l1RR8DgWEDSyOMzWtbWDJnCQFpQIB54/edit?usp=sharing). > "adopted task timeout" runs on every heartbeat I’m either misunderstanding this or it’s not accurate. At least, there is a `orphaned_tasks_check_interval` and in our experience it gets used. It could potentially be a simplification as you mentioned to unify the orphaned TI behavior - e.g. let it pick up both TI’s orphaned by a dead scheduler (e.g. primarily tasks in `SCHEDULED` state) and TI’s orphaned by a malfunctioning celery consumer (e.g. primarily tasks in `QUEUED` state). As a sidenote, for tasks lost by shut down workers like https://github.com/celery/celery/issues/7266, you might give [this config](https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-reject-on-worker-lost) in celery a shot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
