ephraimbuddy opened a new pull request #21901:
URL: https://github.com/apache/airflow/pull/21901
Currently, we nullify next_dagrun_create_after when a dagrun reaches
the max_active_runs. This causes deadlock when running multiple
schedulers(3 & above). The deadlock happens at the point of updating the
next_dagrun_create_after(dag.calculate_dagrun_date_fields).
I think this operation of nullifying the next_dagrun_create_after is
expensive
and have created a boolean column on dagmodel to track max_active_runs
Here's the error I got from running three schedulers with 100 dags with one
datetime sensor each.
```
[2022-03-01 09:48:05,479] {scheduler_job.py:707} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1705, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
716, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 11782 waits for ShareLock on transaction 129251; blocked by
process 6989.
Process 6989 waits for ShareLock on transaction 128879; blocked by process
11782.
HINT: See server log for query details.
CONTEXT: while updating tuple (1,59) in relation "dag"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 33, in <module>
sys.exit(load_entry_point('apache-airflow', 'console_scripts',
'airflow')())
File "/opt/airflow/airflow/__main__.py", line 48, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_parser.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 75, in
scheduler
_run_scheduler_job(args=args)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 46, in
_run_scheduler_job
job.run()
File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 680, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 766, in
_run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 852, in
_do_scheduling
guard.commit()
File "/opt/airflow/airflow/utils/sqlalchemy.py", line 281, in commit
self.session.commit()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1423, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 829, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 808, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 3255, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 3395, in _flush
transaction.rollback(_capture_exception=True)
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line
70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 3355, in _flush
flush_context.execute()
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line
450, in execute
n.execute_aggregate(self, set_)
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line
744, in execute_aggregate
persistence.save_obj(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
234, in save_obj
_emit_update_statements(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line
998, in _emit_update_statements
c = connection._execute_20(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1520, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py",
line 313, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1389, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1748, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1929, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py",
line 1705, in _execute_context
self.dialect.do_execute(
File
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line
716, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock
detected
DETAIL: Process 11782 waits for ShareLock on transaction 129251; blocked by
process 6989.
Process 6989 waits for ShareLock on transaction 128879; blocked by process
11782.
HINT: See server log for query details.
CONTEXT: while updating tuple (1,59) in relation "dag"
[SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s,
next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s,
next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s,
next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id =
%(dag_dag_id)s]
[parameters: {'next_dagrun': DateTime(2021, 1, 10, 0, 0, 0,
tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_start': DateTime(2021, 1,
10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_end':
DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')),
'next_dagrun_create_after': DateTime(2021, 1, 11, 0, 0, 0,
tzinfo=Timezone('UTC')), 'dag_dag_id': 'sensor_75'}]
(Background on this error at: http://sqlalche.me/e/14/e3q8)
```
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]