baraknielsen opened a new issue #15458:
URL: https://github.com/apache/airflow/issues/15458


   **Apache Airflow version**:
   1.10.10
   
   **What happened**:
   
   Hi All,
   when trying to trigger my dag which have a two-level nested subdags from 
another dag 
   
   the task is failing with an error that one of the subdags key already exist 
although the dag is eventually get triggered,
   
   from the airflow code i saw that when calling main_dag.subdags it returns 
all the subdags inside including the one that are nested on its subdags,
   
   so when i run the TriggerDagRunOperator it tries to trigger the second level 
subdags twice due to this airflow code:
   
   ```python
       while dags_to_trigger:
           dag = dags_to_trigger.pop()
           trigger = dag.create_dagrun(
               run_id=run_id,
               execution_date=execution_date,
               state=State.RUNNING,
               conf=run_conf,
               external_trigger=True,
           )
           triggers.append(trigger)
           if dag.subdags:
               dags_to_trigger.extend(dag.subdags)
   ```
   
   under airflow/api/common/experimental/trigger_dag.py:91
   
   
   **What you expected to happen**:
   
   the TriggerDagRunOperator should finish with success state
   
   **How to reproduce it**:
   
   Trigger Dag code:
   
   ```python
   from airflow import DAG
   from airflow.operators.dagrun_operator import TriggerDagRunOperator
   from airflow.utils.dates import days_ago
   
   interval = '30 8 * * *'
   
   default_args = {
       'owner': 'airflow',
       "depends_on_past": False,
       "start_date": days_ago(1),
       "catchup": False,
   }
   
   with DAG("TriggerExample",
            default_args=default_args,
            catchup=False,
            schedule_interval=interval,
            max_active_runs=1,
            ) as dag:
       dag_trigger_task = TriggerDagRunOperator(
           task_id='trigger_dag',
           trigger_dag_id='TriggeredDag',
           execution_date='{{ ds }}')
   ```
   
   Triggered Dag code:
   
   
   ```python
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.subdag_operator import SubDagOperator
   from airflow.utils.dates import days_ago
   
   default_args = {
       'owner': 'airflow',
       "depends_on_past": False,
       "start_date": days_ago(1),
       "catchup": False,
   }
   
   
   def create_sub_dag2(parent_dag, name):
       dag_name = "{}.{}".format(parent_dag.dag_id, name)
       with DAG(dag_name,
                default_args=default_args,
                catchup=False,
                schedule_interval=None,
                max_active_runs=1,
                ) as dag:
   
           BashOperator(
               task_id='print_date',
               bash_command='date',
           )
   
       return SubDagOperator(
           subdag=dag,
           task_id=name)
   
   
   def create_sub_dag(parent_dag, name):
       dag_name = "{}.{}".format(parent_dag.dag_id, name)
       with DAG(dag_name,
                default_args=default_args,
                catchup=False,
                schedule_interval=None,
                max_active_runs=1,
                ) as dag:
           t1 = BashOperator(
               task_id='print_date',
               bash_command='date',
           )
   
           t2 = create_sub_dag2(dag, "sub2")
   
           t1 >> t2
   
       return SubDagOperator(
           subdag=dag,
           task_id=name)
   
   
   with DAG("TriggeredDag",
            default_args=default_args,
            catchup=False,
            schedule_interval=None,
            max_active_runs=1,
            ) as dag:
       t1 = BashOperator(
           task_id='print_date',
           bash_command='date',
       )
   
       t2 = create_sub_dag(dag, "sub1")
   
       t1 >> t2
   ```
   
   **Failed operator log**:
   
   *** Reading local file: 
/Users/barakyeshua/airflow/logs/TriggerExample/trigger_dag/2021-04-19T08:30:00+00:00/1.log
   [2021-04-20 20:11:18,450] {taskinstance.py:669} INFO - Dependencies all met 
for <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00 
[queued]>
   [2021-04-20 20:11:18,477] {taskinstance.py:669} INFO - Dependencies all met 
for <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00 
[queued]>
   [2021-04-20 20:11:18,477] {taskinstance.py:879} INFO - 
   
--------------------------------------------------------------------------------
   [2021-04-20 20:11:18,477] {taskinstance.py:880} INFO - Starting attempt 1 of 
1
   [2021-04-20 20:11:18,477] {taskinstance.py:881} INFO - 
   
--------------------------------------------------------------------------------
   [2021-04-20 20:11:18,501] {taskinstance.py:900} INFO - Executing 
<Task(TriggerDagRunOperator): trigger_dag> on 2021-04-19T08:30:00+00:00
   [2021-04-20 20:11:18,503] {standard_task_runner.py:53} INFO - Started 
process 62669 to run task
   [2021-04-20 20:11:18,633] {logging_mixin.py:112} INFO - Running %s on host 
%s <TaskInstance: TriggerExample.trigger_dag 2021-04-19T08:30:00+00:00 
[running]> ip-192-168-1-10.ec2.internal
   [2021-04-20 20:11:18,692] {logging_mixin.py:112} INFO - [2021-04-20 
20:11:18,691] {dagbag.py:396} INFO - Filling up the DagBag from 
/Users/barakyeshua/nielsen/GitLab/nielsen-identity-airflow/airflow/dags/triggeredDag.py
   [2021-04-20 20:11:18,880] {taskinstance.py:1145} ERROR - 
(psycopg2.errors.UniqueViolation) duplicate key value violates unique 
constraint "dag_run_dag_id_run_id_key"
   DETAIL:  Key (dag_id, run_id)=(TriggeredDag.sub1.sub2, trig__2021-04-19) 
already exists.
   
   [SQL: INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, 
state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, 
%(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, 
%(conf)s) RETURNING dag_run.id]
   [parameters: {'dag_id': 'TriggeredDag.sub1.sub2', 'execution_date': 
<Pendulum [2021-04-19T00:00:00+00:00]>, 'start_date': datetime.datetime(2021, 
4, 20, 17, 11, 18, 875681, tzinfo=<Timezone [UTC]>), 'end_date': None, 'state': 
'running', 'run_id': 'trig__2021-04-19', 'external_trigger': True, 'conf': 
None}]
   (Background on this error at: http://sqlalche.me/e/gkpj)
   
   
   
   am i missing something? or is this a real bug?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to