vchiapaikeo opened a new issue, #32698:
URL: https://github.com/apache/airflow/issues/32698

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We are on 2.5.3 but believe this issue is present on 2.6.3 as well after 
inspecting source.
   
   During peak times, we notice that we miss heartbeats on one or two of our 
scheduler replicas for almost 5 minutes:
   
   <img width="708" alt="image" 
src="https://github.com/apache/airflow/assets/9200263/e24b7d28-6a3e-4fe4-9066-7a2f2cfb8121";>
   
   Upon closer inspection, it seems that the scheduler replica is stuck on this 
function which checks trigger timeouts (check_trigger_timeouts): 
https://github.com/apache/airflow/blob/2.6.3/airflow/jobs/scheduler_job_runner.py#L1632-L1656
   
   We believe this is because other scheduler replicas are trying to run the 
exact same UPDATE on the triggerer table.
   
   Full stack trace that shows the deadlocked query:
   
   ```
   2023-07-18 00:03:21.135 | [2023-07-18 00:03:21,130] {scheduler_job.py:776} 
ERROR - Exception when executing SchedulerJob._run_scheduler_loop |  
   -- | -- | --
     |   | 2023-07-18 00:03:21.135 | Traceback (most recent call last): |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1900, in _execute_context |  
     |   | 2023-07-18 00:03:21.135 | self.dialect.do_execute( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute |  
     |   | 2023-07-18 00:03:21.135 | cursor.execute(statement, parameters) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
206, in execute |  
     |   | 2023-07-18 00:03:21.135 | res = self._query(query) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
319, in _query |  
     |   | 2023-07-18 00:03:21.135 | db.query(q) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/connections.py", 
line 254, in query |  
     |   | 2023-07-18 00:03:21.135 | _mysql.connection.query(self, query) |  
     |   | 2023-07-18 00:03:21.135 | MySQLdb.OperationalError: (1213, 'Deadlock 
found when trying to get lock; try restarting transaction') |  
     |   | 2023-07-18 00:03:21.135 |   |  
     |   | 2023-07-18 00:03:21.135 | The above exception was the direct cause 
of the following exception: |  
     |   | 2023-07-18 00:03:21.135 |   |  
     |   | 2023-07-18 00:03:21.135 | Traceback (most recent call last): |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
 line 759, in _execute |  
     |   | 2023-07-18 00:03:21.135 | self._run_scheduler_loop() |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
 line 897, in _run_scheduler_loop |  
     |   | 2023-07-18 00:03:21.135 | next_event = timers.run(blocking=False) |  
     |   | 2023-07-18 00:03:21.135 | File "/usr/local/lib/python3.10/sched.py", 
line 151, in run |  
     |   | 2023-07-18 00:03:21.135 | action(*argument, **kwargs) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py",
 line 37, in repeat |  
     |   | 2023-07-18 00:03:21.135 | action(*args, **kwargs) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 75, in wrapper |  
     |   | 2023-07-18 00:03:21.135 | return func(*args, session=session, 
**kwargs) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py",
 line 1501, in check_trigger_timeouts |  
     |   | 2023-07-18 00:03:21.135 | .update( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py", 
line 3306, in update |  
     |   | 2023-07-18 00:03:21.135 | result = self.session.execute( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 1714, in execute |  
     |   | 2023-07-18 00:03:21.135 | result = conn._execute_20(statement, 
params or {}, execution_options) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1705, in _execute_20 |  
     |   | 2023-07-18 00:03:21.135 | return meth(self, args_10style, 
kwargs_10style, execution_options) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection |  
     |   | 2023-07-18 00:03:21.135 | return connection._execute_clauseelement( 
|  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1572, in _execute_clauseelement |  
     |   | 2023-07-18 00:03:21.135 | ret = self._execute_context( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1943, in _execute_context |  
     |   | 2023-07-18 00:03:21.135 | self._handle_dbapi_exception( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 2124, in _handle_dbapi_exception |  
     |   | 2023-07-18 00:03:21.135 | util.raise_( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_ |  
     |   | 2023-07-18 00:03:21.135 | raise exception |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1900, in _execute_context |  
     |   | 2023-07-18 00:03:21.135 | self.dialect.do_execute( |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute |  
     |   | 2023-07-18 00:03:21.135 | cursor.execute(statement, parameters) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
206, in execute |  
     |   | 2023-07-18 00:03:21.135 | res = self._query(query) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
319, in _query |  
     |   | 2023-07-18 00:03:21.135 | db.query(q) |  
     |   | 2023-07-18 00:03:21.135 | File 
"/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/connections.py", 
line 254, in query |  
     |   | 2023-07-18 00:03:21.135 | _mysql.connection.query(self, query) |  
     |   | 2023-07-18 00:03:21.135 | sqlalchemy.exc.OperationalError: 
(MySQLdb.OperationalError) (1213, 'Deadlock found when trying to get lock; try 
restarting transaction') |  
     |   | 2023-07-18 00:03:21.135 | [SQL: UPDATE task_instance SET state=%s, 
updated_at=%s, trigger_id=%s, next_method=%s, next_kwargs=%s WHERE 
task_instance.state = %s AND task_instance.trigger_timeout < %s] |  
     |   | 2023-07-18 00:03:21.135 | [parameters: 
(<TaskInstanceState.SCHEDULED: 'scheduled'>, datetime.datetime(2023, 7, 18, 0, 
3, 13, 737519), None, '__fail__', '{"__var": {"error": "Trigger/execution 
timeout"}, "__type": "dict"}', <TaskInstanceState.DEFERRED: 'deferred'>, 
datetime.datetime(2023, 7, 18, 0, 3, 13, 736019, tzinfo=Timezone('UTC')))] |  
     |   | 2023-07-18 00:03:21.135 | (Background on this error at: 
https://sqlalche.me/e/14/e3q8) |  
     |   | 2023-07-18 00:03:21.135 | [2023-07-18 00:03:21,135] 
{kubernetes_executor.py:885} INFO - Shutting down Kubernetes executor
   ```
   
   Logs missing for a single replica during deadlocked period:
   
   <img width="1427" alt="image" 
src="https://github.com/apache/airflow/assets/9200263/b20faae4-e5b2-4a25-a145-ccb7b8e1e631";>
   
   
   ### What you think should happen instead
   
   Trigger timeout checks should be handled in some kind of synchronous way 
among multiple schedulers. I'm not sure how we could implement this but a lock 
across replicas (perhaps using the db) is one alternative. Perhaps the Airflow 
core team know of a better way to do this?
   
   ### How to reproduce
   
   This is a bit difficult to reproduce since it is an intermittent issue but 
the general way would be to run multiple scheduler replicas, run multiple 
triggeres, set the AIRFLOW__SCHEDULER__TRIGGER_TIMEOUT_CHECK_INTERVAL to a low 
enough value, have multiple sensors outstanding, and allow the scheduler to 
loop until a deadlock is hit
   
   ### Operating System
   
   Debian 11
   
   ### Versions of Apache Airflow Providers
   
   I don't think this is relevant here
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   KubernetesExecutor, MySQL 8, 5 Scheduler Replicas, 3 Triggerer Processes
   
   ### Anything else
   
   Would be happy to help submit a PR but would need guidance on implementation 
:-)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to