dstaple opened a new issue, #27473:
URL: https://github.com/apache/airflow/issues/27473

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   The Airflow scheduler exits with `psycopg2.errors.DeadlockDetected`, and 
several running tasks fail with SIGTERMs.
   
   One of the queries involved in the deadlock originates from the scheduler 
and is of the form
   ```
   UPDATE task_instance SET state='scheduled' ...
   ```
   This can be seen in both the scheduler and database logs, and originates 
here:
   ```
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 909, in schedule_tis
       .update({TI.state: State.SCHEDULED}, synchronize_session=False)
   ```
   
   The other query involved in the deadlock is shown in the database logs as 
follows:
   ```
   UPDATE dag_run SET 
last_scheduling_decision='2022-11-01T00:00:22.409295+00:00'::timestamptz WHERE 
dag_run.id = 841582
   ```
   From reading the Airflow source code, this seems likely to originate from 
`airflow/models/dagrun.py` in the function `update_state()`.
   
   Database logs look as follows:
   ```
   2022-11-01 03:57:41.327 UTC [174772] ERROR:  deadlock detected
   2022-11-01 03:57:41.327 UTC [174772] DETAIL:  Process 174772 waits for 
ShareLock on transaction 780275485; blocked by process 176738.
        Process 176738 waits for ExclusiveLock on tuple (15622,258) of relation 
599796 of database 599710; blocked by process 176552.
        Process 176552 waits for ShareLock on transaction 780275254; blocked by 
process 174772.
        Process 174772: UPDATE task_instance SET state='scheduled' WHERE 
task_instance.dag_id = 'my_sanitized_dag_id_1' AND task_instance.run_id = 
'my_sanitized_run_id_1' AND task_instance.task_id IN ('my_sanitized_task_id_1', 
'my_sanitized_task_id_2', 'my_sanitized_task_id_3', 'my_sanitized_task_id_4', 
'my_sanitized_task_id_5', 'my_sanitized_task_id_6', 'my_sanitized_task_id_7', 
'my_sanitized_task_id_8', 'my_sanitized_task_id_9', 'my_sanitized_task_id_10', 
'my_sanitized_task_id_11', 'my_sanitized_task_id_12', 
'my_sanitized_task_id_13', 'my_sanitized_task_id_14', 'my_sanitized_task_id_15')
        Process 176738: UPDATE dag_run SET 
last_scheduling_decision='2022-11-01T00:00:22.409295+00:00'::timestamptz WHERE 
dag_run.id = 841582
        Process 176552: UPDATE dag_run SET 
last_scheduling_decision='2022-11-01T00:00:22.409295+00:00'::timestamptz WHERE 
dag_run.id = 841582
   2022-11-01 03:57:41.327 UTC [174772] HINT:  See server log for query details.
   2022-11-01 03:57:41.327 UTC [174772] CONTEXT:  while updating tuple 
(227875,18) in relation "task_instance"
   2022-11-01 03:57:41.327 UTC [174772] STATEMENT:  UPDATE task_instance SET 
state='scheduled' WHERE task_instance.dag_id = 'my_sanitized_dag_id_1' AND 
task_instance.run_id = 'my_sanitized_run_id_1' AND task_instance.task_id IN 
('my_sanitized_task_id_1', 'my_sanitized_task_id_2', 'my_sanitized_task_id_3', 
'my_sanitized_task_id_4', 'my_sanitized_task_id_5', 'my_sanitized_task_id_6', 
'my_sanitized_task_id_7', 'my_sanitized_task_id_8', 'my_sanitized_task_id_9', 
'my_sanitized_task_id_10', 'my_sanitized_task_id_11', 
'my_sanitized_task_id_12', 'my_sanitized_task_id_13', 
'my_sanitized_task_id_14', 'my_sanitized_task_id_15')
   ```
   
   The stack trace in the Airflow scheduler looks as follows:
   ```
   [2022-11-01 03:57:51,140] {{scheduler_job.py:753}} INFO - Exited execute loop
   Traceback (most recent call last):
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 609, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 174772 waits for ShareLock on transaction 780275485; 
blocked by process 176738.
   Process 176738 waits for ExclusiveLock on tuple (15622,258) of relation 
599796 of database 599710; blocked by process 176552.
   Process 176552 waits for ShareLock on transaction 780275254; blocked by 
process 174772.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (227875,18) in relation "task_instance"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 
48, in main
       args.func(args)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 75, in scheduler
       _run_scheduler_job(args=args)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py",
 line 46, in _run_scheduler_job
       job.run()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", 
line 246, in run
       self._execute()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 726, in _execute
       self._run_scheduler_loop()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 807, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 890, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1147, in _schedule_dag_run
       dag_run.schedule_tis(schedulable_tis, session)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", 
line 67, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 909, in schedule_tis
       .update({TI.state: State.SCHEDULED}, synchronize_session=False)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 4063, in update
       update_op.exec_()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1697, in exec_
       self._do_exec()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1895, in _do_exec
       self._execute_stmt(update_stmt)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py",
 line 1702, in _execute_stmt
       self.result = self.query._execute_crud(stmt, self.mapper)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3568, in _execute_crud
       return conn.execute(stmt, self._params)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
       distilled_params,
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 609, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 174772 waits for ShareLock on transaction 780275485; 
blocked by process 176738.
   Process 176738 waits for ExclusiveLock on tuple (15622,258) of relation 
599796 of database 599710; blocked by process 176552.
   Process 176552 waits for ShareLock on transaction 780275254; blocked by 
process 174772.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (227875,18) in relation "task_instance"
   
   [SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = 
%(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id 
IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, 
%(task_id_6)s, %(task_id_7)s, %(task_id_8)s, %(task_id_9)s, %(task_id_10)s, 
%(task_id_11)s, %(task_id_12)s, %(task_id_13)s, %(task_id_14)s, %(task_id_15)s)]
   [parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 
'dag_id_1': 'my_sanitized_dag_id_1', 'run_id_1': 'my_sanitized_run_id_1', 
'task_id_1': 'my_sanitized_task_id_1', 'task_id_2': 'my_sanitized_task_id_2', 
'task_id_3': 'my_sanitized_task_id_3', 'task_id_4': 'my_sanitized_task_id_4', 
'task_id_5': 'my_sanitized_task_id_5', 'task_id_6': 'my_sanitized_task_id_6', 
'task_id_7': 'my_sanitized_task_id_7', 'task_id_8': 'my_sanitized_task_id_8', 
'task_id_9': 'my_sanitized_task_id_9', 'task_id_10': 'my_sanitized_task_id_10', 
'task_id_11': 'my_sanitized_task_id_11', 'task_id_12': 
'my_sanitized_task_id_12', 'task_id_13': 'my_sanitized_task_id_13', 
'task_id_14': 'my_sanitized_task_id_14', 'task_id_15': 
'my_sanitized_task_id_15'}]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   The problem is not easily reproducible. It occurs approximately once every 
two weeks when operating at scale (50-150 DAGs, several of which have hundreds 
of tasks).
   
   
   ### Operating System
   
   CentOS 7
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-cncf-kubernetes==2.0.3
   apache-airflow-providers-elasticsearch==2.0.3
   apache-airflow-providers-ftp==2.1.2
   apache-airflow-providers-http==2.1.2
   apache-airflow-providers-imap==2.2.3
   apache-airflow-providers-mysql==2.1.1
   apache-airflow-providers-postgres==2.3.0
   apache-airflow-providers-sqlite==2.1.3
   ```
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   * Airflow 2.2.5 patched by cherry-picking [be2b53eaaf Don't mistakenly take 
a lock on DagRun via ti.refresh_from_fb 
(#25312)](https://github.com/apache/airflow/issues/23361).
   * KubernetesExecutor
   * A single Airflow scheduler is running.
   * Row level locking is enabled.
   * Scheduler parsing_processes = 5
   * Scheduler resources: 8 cores, 5 GB RAM
   * Database resources: 12 cores, 8 GB RAM (Postgres 11.3)
   
   ### Anything else
   
   This issue is distinct from the previous deadlock issue reported in 
https://github.com/apache/airflow/issues/23361 and fixed by 
https://github.com/apache/airflow/pull/25312.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to