GitHub user tongtie added a comment to the discussion: How to trigger a dagrun 
and specify a queue, so that the same dag can be executed on different workers.

@potiuk 
Could you please help me check if I understand correctly? I have the following 
DAG:
```python
# $AIRFLOW_HOME/dags/dynamic_pipelines.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'queue': 'queueA',
    'retries': 1,
}

with DAG(
    dag_id='example_dynamic_dag',
    default_args=default_args,
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    params={'queue': ''},
) as dag:
    
    task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )
```

I added the following airflow_local_settings.py separately in Data Center A and 
Data Center B.
```python
# $AIRFLOW_HOME/config/airflow_local_settings.py

from airflow.policies import HookImplementations
from airflow.models.taskinstance import TaskInstance

def reroute_tasks(task_instance: TaskInstance):
    if task_instance.dag_id == 'example_dynamic_dag':
        if task_instance.dag_run.conf['queue'] == 'queueB':
            task_instance.queue = 'queueB'

def apply_cluster_policies():
    HookImplementations.cluster_policies = {
        'task_instance_mutation_hook': [reroute_tasks],
    }

apply_cluster_policies()
```

When I execute this command, it will be executed by the worker in Data Center 
A. 
```bash
airflow dags trigger example_dynamic_dag
```


When I execute another command, it will be executed by the worker in Data 
Center B. 
```bash
airflow dags trigger example_dynamic_dag --conf '{"queue" : "queueB"}'
```

Is this correct?

GitHub link: 
https://github.com/apache/airflow/discussions/46880#discussioncomment-12248956

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to