Gollum999 opened a new issue, #25730:
URL: https://github.com/apache/airflow/issues/25730

   ### Apache Airflow version
   
   2.3.3
   
   ### What happened
   
   I have a DAG that contains [dynamic 
tasks](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html),
 and when I try to `backfill` it I consistently run into a errors like the one 
below:
   
   ```
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 3398082 waits for ShareLock on transaction 356299; blocked 
by process 3403341.
   Process 3403341 waits for ShareLock on transaction 356228; blocked by 
process 3398082.
   HINT:  See server log for query details.
   CONTEXT:  while locking tuple (0,58) in relation "dag_run"
   SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE 
"dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND 
"run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY 
SHARE OF x"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 847, in _execute
       self._execute_dagruns(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 737, in _execute_dagruns
       processed_dag_run_dates = self._process_backfill_task_instances(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 637, in _process_backfill_task_instances
       run.update_state(session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 524, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 647, in task_instance_scheduling_decisions
       self.verify_integrity(missing_indexes=missing_indexes, session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 853, in verify_integrity
       self._create_task_instances(dag.dag_id, tasks, created_counts, 
hook_is_noop, session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 1046, in _create_task_instances
       session.bulk_insert_mappings(TI, tasks)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3736, in bulk_insert_mappings
       self._bulk_save_mappings(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3843, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3831, in _bulk_save_mappings
       persistence._bulk_insert(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 107, in _bulk_insert
       _emit_insert_statements(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 1097, in _emit_insert_statements
       c = connection._execute_20(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1631, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py",
 line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1498, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1862, in _execute_context
       self._handle_dbapi_exception(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2043, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 3398082 waits for ShareLock on transaction 356299; blocked 
by process 3403341.
   Process 3403341 waits for ShareLock on transaction 356228; blocked by 
process 3398082.
   HINT:  See server log for query details.
   CONTEXT:  while locking tuple (0,58) in relation "dag_run"
   SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE 
"dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND 
"run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY 
SHARE OF x"
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, 
try_number, max_tries, hostname, unixname, pool, pool_slots, queue, 
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, 
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, 
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, 
%(operator)s, %(executor_config)s)]
   [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 
'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_
 pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})]
   ```
   
   <details>
     <summary>More log context</summary>
   
   ```
   [2022-08-15 16:10:38,545] {dagbag.py:508} INFO - Filling up the DagBag from 
/home/tsanders/airflow_standalone_sqlite/dags/test_backfill_races.py
   [2022-08-15 16:10:38,740] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=15 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,741] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=14 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,747] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=10 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,747] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=9 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,748] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=16 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,755] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=12 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,756] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=8 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,761] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=11 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,767] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=18 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,768] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=17 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,771] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=19 [queued]> on host nkclintjs.tradebot.com
   [2022-08-15 16:10:38,776] {task_command.py:371} INFO - Running 
<TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 
map_index=13 [queued]> on host nkclintjs.tradebot.com
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 3398082 waits for ShareLock on transaction 356299; blocked 
by process 3403341.
   Process 3403341 waits for ShareLock on transaction 356228; blocked by 
process 3398082.
   HINT:  See server log for query details.
   CONTEXT:  while locking tuple (0,58) in relation "dag_run"
   SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE 
"dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND 
"run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY 
SHARE OF x"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 847, in _execute
       self._execute_dagruns(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 737, in _execute_dagruns
       processed_dag_run_dates = self._process_backfill_task_instances(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 637, in _process_backfill_task_instances
       run.update_state(session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 524, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 647, in task_instance_scheduling_decisions
       self.verify_integrity(missing_indexes=missing_indexes, session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 68, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 853, in verify_integrity
       self._create_task_instances(dag.dag_id, tasks, created_counts, 
hook_is_noop, session=session)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 1046, in _create_task_instances
       session.bulk_insert_mappings(TI, tasks)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3736, in bulk_insert_mappings
       self._bulk_save_mappings(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3843, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3831, in _bulk_save_mappings
       persistence._bulk_insert(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 107, in _bulk_insert
       _emit_insert_statements(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 1097, in _emit_insert_statements
       c = connection._execute_20(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1631, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py",
 line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1498, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1862, in _execute_context
       self._handle_dbapi_exception(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2043, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 3398082 waits for ShareLock on transaction 356299; blocked 
by process 3403341.
   Process 3403341 waits for ShareLock on transaction 356228; blocked by 
process 3398082.
   HINT:  See server log for query details.
   CONTEXT:  while locking tuple (0,58) in relation "dag_run"
   SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE 
"dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND 
"run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY 
SHARE OF x"
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, 
try_number, max_tries, hostname, unixname, pool, pool_slots, queue, 
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, 
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, 
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, 
%(operator)s, %(executor_config)s)]
   [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 
'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_
 pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/conda/envs/production/bin/airflow", line 11, in <module>
       sys.exit(main())
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py", 
line 38, in main
       args.func(args)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
 line 51, in command
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 99, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py",
 line 107, in dag_backfill
       dag.run(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py", 
line 2288, in run
       job.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py",
 line 244, in run
       self._execute()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 876, in _execute
       session.commit()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 1435, in commit
       self._transaction.commit(_to_root=self.future)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 827, in commit
       self._assert_active(prepared_ok=True)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 601, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.DeadlockDetected) deadlock detected
   DETAIL:  Process 3398082 waits for ShareLock on transaction 356299; blocked 
by process 3403341.
   Process 3403341 waits for ShareLock on transaction 356228; blocked by 
process 3398082.
   HINT:  See server log for query details.
   CONTEXT:  while locking tuple (0,58) in relation "dag_run"
   SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE 
"dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND 
"run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY 
SHARE OF x"
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, 
try_number, max_tries, hostname, unixname, pool, pool_slots, queue, 
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, 
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, 
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, 
%(operator)s, %(executor_config)s)]
   [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 
'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_
 pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})]
   (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on 
this error at: https://sqlalche.me/e/14/7s2a)
   Process QueuedLocalWorker-4:
   Process QueuedLocalWorker-5:
   Process QueuedLocalWorker-3:
   Process QueuedLocalWorker-2:
   Process QueuedLocalWorker-13:
   Process QueuedLocalWorker-6:
   Process QueuedLocalWorker-7:
   Process QueuedLocalWorker-8:
   Process QueuedLocalWorker-12:
   Process QueuedLocalWorker-11:
   Process QueuedLocalWorker-9:
   Process QueuedLocalWorker-10:
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
   Traceback (most recent call last):
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
   
   During handling of the above exception, another exception occurred:
   
     File "<string>", line 2, in task_done
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
   
   During handling of the above exception, another exception occurred:
   
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
   BrokenPipeError: [Errno 32] Broken pipe
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   Traceback (most recent call last):
   Traceback (most recent call last):
   Traceback (most recent call last):
   Traceback (most recent call last):
   Traceback (most recent call last):
   Traceback (most recent call last):
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
   
   During handling of the above exception, another exception occurred:
   
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   SystemExit: 0
   Traceback (most recent call last):
   Traceback (most recent call last):
   SystemExit: 0
   Traceback (most recent call last):
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   
   During handling of the above exception, another exception occurred:
   
   SystemExit: 0
   Traceback (most recent call last):
   
   During handling of the above exception, another exception occurred:
   
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
   Traceback (most recent call last):
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 190, in do_work
       self.execute_work(key=key, command=command)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 84, in execute_work
       state = self._execute_work_in_fork(command)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 102, in _execute_work_in_fork
       pid, ret = os.waitpid(pid, 0)
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 271, in sigint_handler
       sys.exit(0)
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
   
   During handling of the above exception, another exception occurred:
   
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
   Traceback (most recent call last):
     File "<string>", line 2, in task_done
   SystemExit: 0
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File "<string>", line 2, in task_done
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
   
   During handling of the above exception, another exception occurred:
   
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
315, in _bootstrap
       self.run()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
   BrokenPipeError: [Errno 32] Broken pipe
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 67, in run
       return super().run()
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
   BrokenPipeError: [Errno 32] Broken pipe
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 
108, in run
       self._target(*self._args, **self._kwargs)
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py",
 line 192, in do_work
       self.task_queue.task_done()
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File "<string>", line 2, in task_done
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 
809, in _callmethod
       conn.send((self._id, methodname, args, kwds))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
211, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
416, in _send_bytes
       self._send(header + buf)
     File 
"/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 
373, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   BrokenPipeError: [Errno 32] Broken pipe
   ```
   </details>
   
   When I tried to run this backfill on our production Airflow deployment, I 
got the following error instead:
   
   ```
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   psycopg2.errors.UniqueViolation: duplicate key value violates unique 
constraint "task_instance_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_backfill_races, t2, 
backfill__2022-01-04T00:00:00-06:00, 24) already exists.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py",
 line 1048, in _create_task_instances
       session.bulk_save_objects(tasks)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3627, in bulk_save_objects
       self._bulk_save_mappings(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3843, in _bulk_save_mappings
       transaction.rollback(_capture_exception=True)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 3831, in _bulk_save_mappings
       persistence._bulk_insert(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 107, in _bulk_insert
       _emit_insert_statements(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 1097, in _emit_insert_statements
       c = connection._execute_20(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1631, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py",
 line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1498, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1862, in _execute_context
       self._handle_dbapi_exception(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2043, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1799, in _execute_context
       self.dialect.do_executemany(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 953, in do_executemany
       context._psycopg2_fetched_rows = xtras.execute_values(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", 
line 1270, in execute_values
       cur.execute(b''.join(parts))
   sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate 
key value violates unique constraint "task_instance_pkey"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(test_backfill_races, t2, 
backfill__2022-01-04T00:00:00-06:00, 24) already exists.
   
   [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, 
try_number, max_tries, hostname, unixname, pool, pool_slots, queue, 
priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, 
%(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, 
%(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, 
%(operator)s, %(executor_config)s)]
   [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-01-04T00:00:00-06:00', 'map_index': 24, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'default_pool', 
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7f2522faf2d0>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 
'run_id': 'backfill__2022-01-04T00:00:00-06:00', 'map_index': 25, 'try_number': 
0, 'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 
'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7f2522faf0f0>}, {'task_id': 't2', 
'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-01-04T00:00:00-06:00', 'map_index': 26, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'defaul
 t_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 
'operator': '_PythonDecoratedOperator', 'executor_config': 
<psycopg2.extensions.Binary object at 0x7f2522faf8d0>}, {'task_id': 't2', 
'dag_id': 'test_backfill_races', 'run_id': 
'backfill__2022-01-04T00:00:00-06:00', 'map_index': 27, 'try_number': 0, 
'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'default_pool', 
'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': 
'_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary 
object at 0x7f2522fafbd0>})]
   (Background on this error at: https://sqlalche.me/e/14/gkpj)
   ```
   
   ### What you think should happen instead
   
   Mapped tasks should be able to be backfilled like "normal" tasks.
   
   ### How to reproduce
   
   ```
   #!/usr/bin/env python3
   import datetime
   import itertools
   import logging
   import string
   
   from airflow.decorators import dag, task
   
   
   logger = logging.getLogger(__name__)
   
   TASK_COUNT = 70  # turning this up increases the likelihood of issues
   
   
   @dag(
       schedule_interval='@daily',
       start_date=datetime.datetime(2022, 8, 15),
       default_args={
           'retries': 0,
       },
   )
   def test_backfill_races():
       @task
       def get_tasks():
           return list(itertools.islice(itertools.cycle(string.ascii_letters), 
TASK_COUNT))
   
       @task
       def t1(arg):
           logger.info(f'{arg=}')
           return arg
   
       @task
       def t2(arg):
           logger.info(f'{arg=}')
   
   
       t2.expand(arg=t1.expand(arg=get_tasks()))
   
   
   dag = test_backfill_races()
   
   
   if __name__ == '__main__':
       dag.cli()
   ```
   ```
   airflow dags backfill test_backfill_races -s 2022-08-01 -e 2022-08-10
   ```
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   None
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   1. Standalone w/ Postgres DB backend.  Relevant configs:
     a. `core.executor = LocalExecutor`
     b. `core.parallelism = 12`
     c. `core.max_active_tasks_per_dag = 16`
     d. `core.max_active_runs_per_dag = 16`
   
   2. Self-hosted w/ Postgres DB backend.  Relevant configs:
     a. `core.executor = CeleryExecutor`
     b. `core.parallelism = 256`
     c. `core.max_active_tasks_per_dag = 128`
     d. `core.max_active_runs_per_dag = 16`
     e. `celery.worker_concurrency = 16`
   
   
   ### Anything else
   
   Possibly related: #16982
   
   Turning down the effective parallelism of the DAG (e.g. 
`max_active_tasks=4`) sometimes helps to avoid this, but (obviously) puts a 
significant bottleneck on the backfill.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to