linax101 opened a new issue #8181: Duplicate key value violates unique 
constraint "task_instance_pkey" error
URL: https://github.com/apache/airflow/issues/8181
 
 
   I'm using Airflow 1.10.9 with celery executor and postgres and I need to 
have a nested loop with the first level having 2 options and the second level 
going into 1800 options to loop through.
   
   I found if I go above 600 options on the second loop I get a duplicate key 
value error. It seems either two sessions of postgres connects or something 
else is triggering the duplicate entry of my first task.
   
   Here is an example DAG that exhibits the issue when you adjust the range 
from 10 to 1800 for `j`. Below should reproduce the issue.
   
   The setup is in docker and is using this docker compose file
   
https://github.com/puckel/docker-airflow/blob/master/docker-compose-CeleryExecutor.yml
   
   `from airflow.operators.python_operator import BranchPythonOperator
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash_operator import BashOperator
   from datetime import datetime, date, timedelta
   from airflow.models import DAG
   import random
   import csv
   from airflow.utils.dates import days_ago
   from airflow.models import Variable
   
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': days_ago(2),
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5),
   }
   
   
   
   def return_branch(**kwargs):
   
       branches = ['branch_0,''branch_1', 'branch_2', 'branch_3', 'branch_4']
   
       return random.choice(branches)
   
   with DAG(
       dag_id='branch_demo',
       default_args=default_args,
       dagrun_timeout=timedelta(hours=2),
       schedule_interval='0 11 * * *'
   ) as dag:
       kick_off_dag = BashOperator(task_id='run_this_first',bash_command='echo 
"first"')
   
       branching = BranchPythonOperator(
           task_id='branching',
           python_callable=return_branch,
           provide_context=True)
   
       kick_off_dag >> branching
   
       for i in range(2):
           d = BashOperator(task_id='branch_{0}'.format(i),bash_command='echo 
"job"')
           for j in range(1800):
               m = BashOperator(task_id='branch_{0}_{1}'.format(i, 
j),bash_command='echo "done"')
   
               d >> m
   
           branching >> d`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to