dima-asana commented on PR #23432:
URL: https://github.com/apache/airflow/pull/23432#issuecomment-1123083198

   First, thank you so much for working on this.  We’ve run into a very similar 
issue and are eagerly looking forward to a fix.  We also now have an isolated 
environment where we can reproduce this, so I’d be happy to test some version 
of this patch if you’d like (may have turnarounds of a week or so, sorry in 
advance).  I went ahead and tested your current code with our setup and it 
works perfectly for us - the stuck task is revoked in Celery so there is no 
chance of it running twice and sent back to the scheduler.
   
   > I'm far from a celery expert but as far as I can tell celery does not 
provide any capability to do this
   
   I’m also not a celery expert, but I believe tasks lost due to a worker 
shutdown like https://github.com/celery/celery/issues/7266 have already gone 
from the broker queue to the celery worker consumer.  This means that you would 
be able to find the tasks on celery rather than on the broker.  You might try 
to look at `app.control.inspect().reserved` for tasks that got reserved but 
didn’t make it to a worker process.  Unfortunately I don’t think it’s possible 
to differentiate these tasks with “legitimately reserved” tasks.  That said, 
the default airflow configuration has a very short lifespan for legitimately 
reserved tasks – with `prefetch_multiplier` as 1, `acks_late` as True, and the 
`fair` scheduling strategy, tasks are only reserved for workers that can 
“immediately” consume them (in practice for us tasks are legitimately reserved 
for ~0.01s)
   
   We experience this behavior due to a [different 
bug](https://github.com/celery/celery/issues/7515) in Celery.  You can see a 
longer description of our experience 
[here](https://docs.google.com/document/d/1Peox1M7PUJQ4l1RR8DgWEDSyOMzWtbWDJnCQFpQIB54/edit?usp=sharing).
   
   > "adopted task timeout" runs on every heartbeat
   
   I’m either misunderstanding this or it’s not accurate.  At least, there is a 
`orphaned_tasks_check_interval` and in our experience it gets used.  It could 
potentially be a simplification as you mentioned to unify the orphaned TI 
behavior - e.g. let it pick up both TI’s orphaned by a dead scheduler (e.g. 
primarily tasks in `SCHEDULED` state) and TI’s orphaned by a malfunctioning 
celery consumer (e.g. primarily tasks in `QUEUED` state).
   
   As a sidenote, for tasks lost by shut down workers like 
https://github.com/celery/celery/issues/7266, you might give [this 
config](https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-reject-on-worker-lost)
 in celery a shot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to