ephraimbuddy opened a new pull request #21901:
URL: https://github.com/apache/airflow/pull/21901


   Currently, we nullify next_dagrun_create_after when a dagrun reaches
   the max_active_runs. This causes deadlock when running multiple
   schedulers(3 & above). The deadlock happens at the point of updating the
   next_dagrun_create_after(dag.calculate_dagrun_date_fields).
   I think this operation of nullifying the next_dagrun_create_after is 
expensive
   and have created a boolean column on dagmodel to track max_active_runs
   
   Here's the error I got from running three schedulers with 100 dags with one 
datetime sensor each.
   
   ```
   [2022-03-01 09:48:05,479] {scheduler_job.py:707} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1705, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
716, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 11782 waits for ShareLock on transaction 129251; blocked by 
process 6989.
   Process 6989 waits for ShareLock on transaction 128879; blocked by process 
11782.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1,59) in relation "dag"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 33, in <module>
       sys.exit(load_entry_point('apache-airflow', 'console_scripts', 
'airflow')())
     File "/opt/airflow/airflow/__main__.py", line 48, in main
       args.func(args)
     File "/opt/airflow/airflow/cli/cli_parser.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 99, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 75, in 
scheduler
       _run_scheduler_job(args=args)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 46, in 
_run_scheduler_job
       job.run()
     File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 680, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 766, in 
_run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 852, in 
_do_scheduling
       guard.commit()
     File "/opt/airflow/airflow/utils/sqlalchemy.py", line 281, in commit
       self.session.commit()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 1423, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 829, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 808, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 3255, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 3395, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 3355, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 
450, in execute
       n.execute_aggregate(self, set_)
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 
744, in execute_aggregate
       persistence.save_obj(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
234, in save_obj
       _emit_update_statements(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 
998, in _emit_update_statements
       c = connection._execute_20(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1520, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", 
line 313, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1389, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1748, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1929, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", 
line 1705, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 
716, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 11782 waits for ShareLock on transaction 129251; blocked by 
process 6989.
   Process 6989 waits for ShareLock on transaction 128879; blocked by process 
11782.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1,59) in relation "dag"
   
   [SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s, 
next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s, 
next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s, 
next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = 
%(dag_dag_id)s]
   [parameters: {'next_dagrun': DateTime(2021, 1, 10, 0, 0, 0, 
tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_start': DateTime(2021, 1, 
10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_end': 
DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')), 
'next_dagrun_create_after': DateTime(2021, 1, 11, 0, 0, 0, 
tzinfo=Timezone('UTC')), 'dag_dag_id': 'sensor_75'}]
   (Background on this error at: http://sqlalche.me/e/14/e3q8)
   ```
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to