mjpieters opened a new issue #12138: URL: https://github.com/apache/airflow/issues/12138
Currently, the celery executor imports the `Task` model for the Celery database backend directly: https://github.com/apache/airflow/blob/2dd4e96045d4a7f45cc8c06df3d25c4f1479392c/airflow/executors/celery_executor.py#L38 However, Celery, being highly customisable, uses `self.task_cls` in the backend implementation; this defaults to either `Task` or `TaskExtended` depending on the Celery configuration, see https://github.com/celery/celery/blob/406f04a082949ac42ec7a4af94fed896c515aaa4/celery/backends/database/__init__.py#L66 and https://github.com/celery/celery/blob/406f04a082949ac42ec7a4af94fed896c515aaa4/celery/backends/database/__init__.py#L76-L77 Airflow should support this flexibility, and instead of importing a fixed class just use `backend.task_cls`, so in `_get_many_from_db_backend` just use: ```python Task = app.backend.task_cls with session_cleanup(session): tasks = session.query(Task).filter(Task.task_id.in_(task_ids)).all() ``` I suppose there is one downside to this: `TaskExtended` fetches a few more columns per task, but given that the result of `task.to_dict()` is passed to `backend.meta_from_decoded()`, which could also have been customised to make use of information contained in a custom model, that's neither here nor there. It would be the price anyone using the `result_extended` option would need to pay elsewhere anyway. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
