GitHub user tongtie added a comment to the discussion: How to trigger a dagrun
and specify a queue, so that the same dag can be executed on different workers.
@potiuk
Could you please help me check if I understand correctly? I have the following
DAG:
```python
# $AIRFLOW_HOME/dags/dynamic_pipelines.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'queue': 'queueA',
'retries': 1,
}
with DAG(
dag_id='example_dynamic_dag',
default_args=default_args,
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
params={'queue': ''},
) as dag:
task1 = BashOperator(
task_id='print_date',
bash_command='date',
)
```
I added the following airflow_local_settings.py separately in Data Center A and
Data Center B.
```python
# $AIRFLOW_HOME/config/airflow_local_settings.py
from airflow.policies import HookImplementations
from airflow.models.taskinstance import TaskInstance
def reroute_tasks(task_instance: TaskInstance):
if task_instance.dag_id == 'example_dynamic_dag':
if task_instance.dag_run.conf['queue'] == 'queueB':
task_instance.queue = 'queueB'
def apply_cluster_policies():
HookImplementations.cluster_policies = {
'task_instance_mutation_hook': [reroute_tasks],
}
apply_cluster_policies()
```
When I execute this command, it will be executed by the worker in Data Center
A.
```bash
airflow dags trigger example_dynamic_dag
```
When I execute another command, it will be executed by the worker in Data
Center B.
```bash
airflow dags trigger example_dynamic_dag --conf '{"queue" : "queueB"}'
```
Is this correct?
GitHub link:
https://github.com/apache/airflow/discussions/46880#discussioncomment-12248956
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]