ephraimbuddy opened a new pull request #18975:
URL: https://github.com/apache/airflow/pull/18975
Using multiple schedulers causes Deadlock in _process_executor_events.
This PR fixes it.
The error:
```
[2021-10-14 08:37:18,377: INFO/ForkPoolWorker-1] Celery task ID:
6e8e34d/persistence.py", line 995, in _emit_update_statements
statement, multiparams
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1257, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
line 912, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock
detected
DETAIL: Process 35678 waits for ShareLock on transaction 150292; blocked by
process 58800.
Process 58800 waits for ShareLock on transaction 150291; blocked by process
58799.
Process 58799 waits for ShareLock on transaction 150289; blocked by process
35678.
HINT: See server log for query details.
CONTEXT: while updating tuple (11,54) in relation "task_instance"
[SQL: UPDATE task_instance SET external_executor_id=%(external_executor_id)s
WHERE task_instance.task_id = %(task_instance_task_id)s AND
task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id =
%(task_instance_run_id)s]
[parameters: ({'external_executor_id':
'fe1c200a-ca4a-49b2-b5f9-ea53d5ed0846', 'task_instance_task_id':
'test_sensor_15', 'task_instance_dag_id': 'airflow_bug',
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'},
{'external_executor_id': '1ee3e79a-ceab-4215-94cb-100d8de73bad',
'task_instance_task_id': 'test_sensor_22', 'task_instance_dag_id':
'airflow_bug', 'task_instance_run_id':
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id':
'8873162e-519f-4ef2-890c-d38754760c81', 'task_instance_task_id':
'test_sensor_24', 'task_instance_dag_id': 'airflow_bug',
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'},
{'external_executor_id': '52d05186-5297-4079-b94a-132bef845673',
'task_instance_task_id': 'test_sensor_27', 'task_instance_dag_id':
'airflow_bug', 'task_instance_run_id':
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id':
'1e47d3a0-4398-4f07-9ebd-73e16ffa02ec', 'task_instance_task_id':
'test_sensor_28', 'task_ins
tance_dag_id': 'airflow_bug', 'task_instance_run_id':
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id':
'1c89c6e0-8cb6-437b-9ee5-56968bb993fc', 'task_instance_task_id':
'test_sensor_35', 'task_instance_dag_id': 'airflow_bug',
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'})]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
```
I tested the fix with 3 schedulers and it works.
How to reproduce:
Using 2 schedulers and celery executor, run this dag:
[dags.zip](https://github.com/apache/airflow/files/7345217/dags.zip)
One scheduler will stop with errors like the above after some time.
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
for more information.
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]