lssatvik opened a new issue #20498:
URL: https://github.com/apache/airflow/issues/20498
### Description
Right now the queue of a dag/task is determined by the queue parameter in
dag/task definition. I want this parameter to take a function as input. If a
function is given as input, it should pass a context variable and use the
return value as the queue for the dag/task.
Airflow dags and tasks support callback functions like for
on_success_callback, which is given the context variable and executed on
success. I want similar capability for determining queue.
### Use case/motivation
I use an independent ec2 instance as a celery worker for every "dagrun". The
queue for any dagrun is dag_id-run_id. In all my dags my first task is always
an operator working in the "master" queue that sets up an ec2 instance and
starts the worker with the custom queue name.
I modified a line in the _enqueue_task_instances_with_queued_state function
in scheduler_job.py:-
`queue = ti.queue if ti.queue == "master" else f"{ti.dag_id}-{ti.run_id}"`
So finally the queue for every task that is not the ec2 operator is
dag_id-run_id. As the ec2_operator starts a celery worker with that specific
queue name all tasks not defined with "master" queue (which has a worker
running locally) are executed in the celery worker.
So my setup required a small modification to the airflow code base. It would
be helpful if the scheduler can determine the queue name through a user-defined
function using the context variable.
In the present state, I can only define a queues for each dag but not for
each dag-run, as the run_id is determined at runtime. Also the changed queue
name does not reflect in the UI as the queue name is only changed when a task
instance is enqueued.
### Related issues
_No response_
### Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]