Gollum999 opened a new issue, #25730: URL: https://github.com/apache/airflow/issues/25730
### Apache Airflow version 2.3.3 ### What happened I have a DAG that contains [dynamic tasks](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html), and when I try to `backfill` it I consistently run into a errors like the one below: ``` Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.DeadlockDetected: deadlock detected DETAIL: Process 3398082 waits for ShareLock on transaction 356299; blocked by process 3403341. Process 3403341 waits for ShareLock on transaction 356228; blocked by process 3398082. HINT: See server log for query details. CONTEXT: while locking tuple (0,58) in relation "dag_run" SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE "dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND "run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY SHARE OF x" The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 847, in _execute self._execute_dagruns( File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 737, in _execute_dagruns processed_dag_run_dates = self._process_backfill_task_instances( File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 637, in _process_backfill_task_instances run.update_state(session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 524, in update_state info = self.task_instance_scheduling_decisions(session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 647, in task_instance_scheduling_decisions self.verify_integrity(missing_indexes=missing_indexes, session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 853, in verify_integrity self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 1046, in _create_task_instances session.bulk_insert_mappings(TI, tasks) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3736, in bulk_insert_mappings self._bulk_save_mappings( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3843, in _bulk_save_mappings transaction.rollback(_capture_exception=True) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3831, in _bulk_save_mappings persistence._bulk_insert( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 107, in _bulk_insert _emit_insert_statements( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection return connection._execute_clauseelement( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement ret = self._execute_context( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context self._handle_dbapi_exception( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception util.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected DETAIL: Process 3398082 waits for ShareLock on transaction 356299; blocked by process 3403341. Process 3403341 waits for ShareLock on transaction 356228; blocked by process 3398082. HINT: See server log for query details. CONTEXT: while locking tuple (0,58) in relation "dag_run" SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE "dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND "run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY SHARE OF x" [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)] [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_ pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})] ``` <details> <summary>More log context</summary> ``` [2022-08-15 16:10:38,545] {dagbag.py:508} INFO - Filling up the DagBag from /home/tsanders/airflow_standalone_sqlite/dags/test_backfill_races.py [2022-08-15 16:10:38,740] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=15 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,741] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=14 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,747] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=10 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,747] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=9 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,748] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=16 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,755] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=12 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,756] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=8 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,761] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=11 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,767] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=18 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,768] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=17 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,771] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=19 [queued]> on host nkclintjs.tradebot.com [2022-08-15 16:10:38,776] {task_command.py:371} INFO - Running <TaskInstance: test_backfill_races.t1 backfill__2022-02-04T06:00:00+00:00 map_index=13 [queued]> on host nkclintjs.tradebot.com Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.DeadlockDetected: deadlock detected DETAIL: Process 3398082 waits for ShareLock on transaction 356299; blocked by process 3403341. Process 3403341 waits for ShareLock on transaction 356228; blocked by process 3398082. HINT: See server log for query details. CONTEXT: while locking tuple (0,58) in relation "dag_run" SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE "dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND "run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY SHARE OF x" The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 847, in _execute self._execute_dagruns( File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 737, in _execute_dagruns processed_dag_run_dates = self._process_backfill_task_instances( File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 637, in _process_backfill_task_instances run.update_state(session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 524, in update_state info = self.task_instance_scheduling_decisions(session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 647, in task_instance_scheduling_decisions self.verify_integrity(missing_indexes=missing_indexes, session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 853, in verify_integrity self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 1046, in _create_task_instances session.bulk_insert_mappings(TI, tasks) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3736, in bulk_insert_mappings self._bulk_save_mappings( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3843, in _bulk_save_mappings transaction.rollback(_capture_exception=True) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3831, in _bulk_save_mappings persistence._bulk_insert( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 107, in _bulk_insert _emit_insert_statements( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection return connection._execute_clauseelement( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement ret = self._execute_context( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context self._handle_dbapi_exception( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception util.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected DETAIL: Process 3398082 waits for ShareLock on transaction 356299; blocked by process 3403341. Process 3403341 waits for ShareLock on transaction 356228; blocked by process 3398082. HINT: See server log for query details. CONTEXT: while locking tuple (0,58) in relation "dag_run" SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE "dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND "run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY SHARE OF x" [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)] [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_ pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})] (Background on this error at: https://sqlalche.me/e/14/e3q8) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/conda/envs/production/bin/airflow", line 11, in <module> sys.exit(main()) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main args.func(args) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command return func(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper return f(*args, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py", line 107, in dag_backfill dag.run( File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dag.py", line 2288, in run job.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run self._execute() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper return func(*args, session=session, **kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/jobs/backfill_job.py", line 876, in _execute session.commit() File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1435, in commit self._transaction.commit(_to_root=self.future) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 827, in commit self._assert_active(prepared_ok=True) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 601, in _assert_active raise sa_exc.PendingRollbackError( sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) deadlock detected DETAIL: Process 3398082 waits for ShareLock on transaction 356299; blocked by process 3403341. Process 3403341 waits for ShareLock on transaction 356228; blocked by process 3398082. HINT: See server log for query details. CONTEXT: while locking tuple (0,58) in relation "dag_run" SQL statement "SELECT 1 FROM ONLY "public"."dag_run" x WHERE "dag_id"::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text AND "run_id"::pg_catalog.text OPERATOR(pg_catalog.=) $2::pg_catalog.text FOR KEY SHARE OF x" [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)] [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 8, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920720>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 9, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920f90>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 10, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_ pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb920c30>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-02-04T06:00:00+00:00', 'map_index': 11, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tsanders', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7fb6bb9200f0>})] (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a) Process QueuedLocalWorker-4: Process QueuedLocalWorker-5: Process QueuedLocalWorker-3: Process QueuedLocalWorker-2: Process QueuedLocalWorker-13: Process QueuedLocalWorker-6: Process QueuedLocalWorker-7: Process QueuedLocalWorker-8: Process QueuedLocalWorker-12: Process QueuedLocalWorker-11: Process QueuedLocalWorker-9: Process QueuedLocalWorker-10: Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) Traceback (most recent call last): SystemExit: 0 During handling of the above exception, another exception occurred: Traceback (most recent call last): Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) During handling of the above exception, another exception occurred: File "<string>", line 2, in task_done Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) During handling of the above exception, another exception occurred: File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() BrokenPipeError: [Errno 32] Broken pipe Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe Traceback (most recent call last): Traceback (most recent call last): Traceback (most recent call last): Traceback (most recent call last): Traceback (most recent call last): Traceback (most recent call last): Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) During handling of the above exception, another exception occurred: File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) SystemExit: 0 During handling of the above exception, another exception occurred: SystemExit: 0 Traceback (most recent call last): Traceback (most recent call last): SystemExit: 0 Traceback (most recent call last): SystemExit: 0 During handling of the above exception, another exception occurred: During handling of the above exception, another exception occurred: SystemExit: 0 Traceback (most recent call last): During handling of the above exception, another exception occurred: During handling of the above exception, another exception occurred: Traceback (most recent call last): Traceback (most recent call last): During handling of the above exception, another exception occurred: Traceback (most recent call last): Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 190, in do_work self.execute_work(key=key, command=command) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 84, in execute_work state = self._execute_work_in_fork(command) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 102, in _execute_work_in_fork pid, ret = os.waitpid(pid, 0) SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", line 271, in sigint_handler sys.exit(0) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) During handling of the above exception, another exception occurred: File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() Traceback (most recent call last): File "<string>", line 2, in task_done SystemExit: 0 File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "<string>", line 2, in task_done File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) During handling of the above exception, another exception occurred: File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) BrokenPipeError: [Errno 32] Broken pipe BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 67, in run return super().run() BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() BrokenPipeError: [Errno 32] Broken pipe File "/opt/conda/envs/production/lib/python3.9/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/executors/local_executor.py", line 192, in do_work self.task_queue.task_done() File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "<string>", line 2, in task_done File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod conn.send((self._id, methodname, args, kwds)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes self._send(header + buf) File "/opt/conda/envs/production/lib/python3.9/multiprocessing/connection.py", line 373, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe BrokenPipeError: [Errno 32] Broken pipe ``` </details> When I tried to run this backfill on our production Airflow deployment, I got the following error instead: ``` Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_backfill_races, t2, backfill__2022-01-04T00:00:00-06:00, 24) already exists. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/dagrun.py", line 1048, in _create_task_instances session.bulk_save_objects(tasks) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3627, in bulk_save_objects self._bulk_save_mappings( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3843, in _bulk_save_mappings transaction.rollback(_capture_exception=True) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3831, in _bulk_save_mappings persistence._bulk_insert( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 107, in _bulk_insert _emit_insert_statements( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements c = connection._execute_20( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection return connection._execute_clauseelement( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement ret = self._execute_context( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context self._handle_dbapi_exception( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception util.raise_( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_ raise exception File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1799, in _execute_context self.dialect.do_executemany( File "/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 953, in do_executemany context._psycopg2_fetched_rows = xtras.execute_values( File "/opt/conda/envs/production/lib/python3.9/site-packages/psycopg2/extras.py", line 1270, in execute_values cur.execute(b''.join(parts)) sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey" DETAIL: Key (dag_id, task_id, run_id, map_index)=(test_backfill_races, t2, backfill__2022-01-04T00:00:00-06:00, 24) already exists. [SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)] [parameters: ({'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-01-04T00:00:00-06:00', 'map_index': 24, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7f2522faf2d0>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-01-04T00:00:00-06:00', 'map_index': 25, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7f2522faf0f0>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-01-04T00:00:00-06:00', 'map_index': 26, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'defaul t_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7f2522faf8d0>}, {'task_id': 't2', 'dag_id': 'test_backfill_races', 'run_id': 'backfill__2022-01-04T00:00:00-06:00', 'map_index': 27, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'tradebot', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0x7f2522fafbd0>})] (Background on this error at: https://sqlalche.me/e/14/gkpj) ``` ### What you think should happen instead Mapped tasks should be able to be backfilled like "normal" tasks. ### How to reproduce ``` #!/usr/bin/env python3 import datetime import itertools import logging import string from airflow.decorators import dag, task logger = logging.getLogger(__name__) TASK_COUNT = 70 # turning this up increases the likelihood of issues @dag( schedule_interval='@daily', start_date=datetime.datetime(2022, 8, 15), default_args={ 'retries': 0, }, ) def test_backfill_races(): @task def get_tasks(): return list(itertools.islice(itertools.cycle(string.ascii_letters), TASK_COUNT)) @task def t1(arg): logger.info(f'{arg=}') return arg @task def t2(arg): logger.info(f'{arg=}') t2.expand(arg=t1.expand(arg=get_tasks())) dag = test_backfill_races() if __name__ == '__main__': dag.cli() ``` ``` airflow dags backfill test_backfill_races -s 2022-08-01 -e 2022-08-10 ``` ### Operating System CentOS Stream 8 ### Versions of Apache Airflow Providers None ### Deployment Other ### Deployment details 1. Standalone w/ Postgres DB backend. Relevant configs: a. `core.executor = LocalExecutor` b. `core.parallelism = 12` c. `core.max_active_tasks_per_dag = 16` d. `core.max_active_runs_per_dag = 16` 2. Self-hosted w/ Postgres DB backend. Relevant configs: a. `core.executor = CeleryExecutor` b. `core.parallelism = 256` c. `core.max_active_tasks_per_dag = 128` d. `core.max_active_runs_per_dag = 16` e. `celery.worker_concurrency = 16` ### Anything else Possibly related: #16982 Turning down the effective parallelism of the DAG (e.g. `max_active_tasks=4`) sometimes helps to avoid this, but (obviously) puts a significant bottleneck on the backfill. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
