baraknielsen opened a new issue #15458:
URL: https://github.com/apache/airflow/issues/15458
**Apache Airflow version**:
1.10.10
**What happened**:
Hi All,
when trying to trigger my dag which have a two-level nested subdags from
another dag
the task is failing with an error that one of the subdags key already exist
although the dag is eventually get triggered,
from the airflow code i saw that when calling main_dag.subdags it returns
all the subdags inside including the one that are nested on its subdags,
so when i run the TriggerDagRunOperator it tries to trigger the second level
subdags twice due to this airflow code:
```python
while dags_to_trigger:
dag = dags_to_trigger.pop()
trigger = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True,
)
triggers.append(trigger)
if dag.subdags:
dags_to_trigger.extend(dag.subdags)
```
under airflow/api/common/experimental/trigger_dag.py:91
**What you expected to happen**:
the TriggerDagRunOperator should finish with success state
**How to reproduce it**:
Trigger Dag code:
```python
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.dates import days_ago
interval = '30 8 * * *'
default_args = {
'owner': 'airflow',
"depends_on_past": False,
"start_date": days_ago(1),
"catchup": False,
}
with DAG("TriggerExample",
default_args=default_args,
catchup=False,
schedule_interval=interval,
max_active_runs=1,
) as dag:
dag_trigger_task = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='TriggeredDag',
execution_date='{{ ds }}')
```
Triggered Dag code:
```python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
"depends_on_past": False,
"start_date": days_ago(1),
"catchup": False,
}
def create_sub_dag2(parent_dag, name):
dag_name = "{}.{}".format(parent_dag.dag_id, name)
with DAG(dag_name,
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1,
) as dag:
BashOperator(
task_id='print_date',
bash_command='date',
)
return SubDagOperator(
subdag=dag,
task_id=name)
def create_sub_dag(parent_dag, name):
dag_name = "{}.{}".format(parent_dag.dag_id, name)
with DAG(dag_name,
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1,
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = create_sub_dag2(dag, "sub2")
t1 >> t2
return SubDagOperator(
subdag=dag,
task_id=name)
with DAG("TriggeredDag",
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1,
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = create_sub_dag(dag, "sub1")
t1 >> t2
```
**Failed operator log**:
*** Reading local file:
/Users/barakyeshua/airflow/logs/TriggerExample/trigger_dag/2021-04-19T08:30:00+00:00/1.log
[2021-04-20 20:11:18,450] {taskinstance.py:669} INFO - Dependencies all met
for <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00
[queued]>
[2021-04-20 20:11:18,477] {taskinstance.py:669} INFO - Dependencies all met
for <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00
[queued]>
[2021-04-20 20:11:18,477] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2021-04-20 20:11:18,477] {taskinstance.py:880} INFO - Starting attempt 1 of
1
[2021-04-20 20:11:18,477] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-04-20 20:11:18,501] {taskinstance.py:900} INFO - Executing
<Task(TriggerDagRunOperator): trigger_dag> on 2021-04-19T08:30:00+00:00
[2021-04-20 20:11:18,503] {standard_task_runner.py:53} INFO - Started
process 62669 to run task
[2021-04-20 20:11:18,633] {logging_mixin.py:112} INFO - Running %s on host
%s <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00
[running]> ip-192-168-1-10.ec2.internal
[2021-04-20 20:11:18,692] {logging_mixin.py:112} INFO - [2021-04-20
20:11:18,691] {dagbag.py:396} INFO - Filling up the DagBag from
/Users/barakyeshua/nielsen/GitLab/nielsen-identity-airflow/airflow/dags/triggeredDag.py
[2021-04-20 20:11:18,880] {taskinstance.py:1145} ERROR -
(psycopg2.errors.UniqueViolation) duplicate key value violates unique
constraint "dag_run_dag_id_run_id_key"
DETAIL: Key (dag_id, run_id)=(TriggeredDag.sub1.sub2, trig__2021-04-19)
already exists.
[SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date,
state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s,
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s,
%(conf)s) RETURNING dag_run.id]
[parameters: {'dag_id': 'TriggeredDag.sub1.sub2', 'execution_date':
<Pendulum [2021-04-19T00:00:00+00:00]>, 'start_date': datetime.datetime(2021,
4, 20, 17, 11, 18, 875681, tzinfo=<Timezone [UTC]>), 'end_date': None, 'state':
'running', 'run_id': 'trig__2021-04-19', 'external_trigger': True, 'conf':
None}]
(Background on this error at: http://sqlalche.me/e/gkpj)
am i missing something? or is this a real bug?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]