lssatvik opened a new issue #20498:
URL: https://github.com/apache/airflow/issues/20498


   ### Description
   
   Right now the queue of a dag/task is determined by the queue parameter in 
dag/task definition. I want this parameter to take a function as input. If a 
function is given as input, it should pass a context variable and use the 
return value as the queue for the dag/task. 
   
   Airflow dags and tasks support callback functions like for 
on_success_callback, which is given the context variable and executed on 
success. I want similar capability for determining queue.
   
   ### Use case/motivation
   
   I use an independent ec2 instance as a celery worker for every "dagrun". The 
queue for any dagrun is dag_id-run_id. In all my dags my first task is always 
an operator working in the "master" queue that sets up an ec2 instance and 
starts the worker with the custom queue name.
   
   I modified a line in the _enqueue_task_instances_with_queued_state function 
in scheduler_job.py:-
   `queue = ti.queue if ti.queue == "master" else f"{ti.dag_id}-{ti.run_id}"`
   
   So finally the queue for every task that is not the ec2 operator is 
dag_id-run_id. As the ec2_operator starts a celery worker with that specific 
queue name all tasks not defined with "master" queue (which has a worker 
running locally) are executed in the celery worker.
   
   So my setup required a small modification to the airflow code base. It would 
be helpful if the scheduler can determine the queue name through a user-defined 
function using the context variable.
   
   In the present state, I can only define a queues for each dag but not for 
each dag-run, as the run_id is determined at runtime. Also the changed queue 
name does not reflect in the UI as the queue name is only changed when a task 
instance is enqueued.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to