mjpieters opened a new issue #12138:
URL: https://github.com/apache/airflow/issues/12138


   Currently, the celery executor imports the `Task` model for the Celery 
database backend directly:
   
   
https://github.com/apache/airflow/blob/2dd4e96045d4a7f45cc8c06df3d25c4f1479392c/airflow/executors/celery_executor.py#L38
   
   However, Celery, being highly customisable, uses `self.task_cls` in the 
backend implementation; this defaults to either `Task` or `TaskExtended` 
depending on the Celery configuration, see
   
   
https://github.com/celery/celery/blob/406f04a082949ac42ec7a4af94fed896c515aaa4/celery/backends/database/__init__.py#L66
   
   and
   
   
https://github.com/celery/celery/blob/406f04a082949ac42ec7a4af94fed896c515aaa4/celery/backends/database/__init__.py#L76-L77
   
   Airflow should support this flexibility, and instead of importing a fixed 
class just use `backend.task_cls`, so in `_get_many_from_db_backend` just use:
   
   ```python
           Task = app.backend.task_cls
           with session_cleanup(session):
               tasks = 
session.query(Task).filter(Task.task_id.in_(task_ids)).all()
   ```
   
   I suppose there is one downside to this: `TaskExtended` fetches a few more 
columns per task, but given that the result of `task.to_dict()` is passed to 
`backend.meta_from_decoded()`, which could also have been customised to make 
use of information contained in a custom model, that's neither here nor there. 
It would be the price anyone using the `result_extended` option would need to 
pay elsewhere anyway.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to