ephraimbuddy opened a new pull request #18975:
URL: https://github.com/apache/airflow/pull/18975


   Using multiple schedulers causes Deadlock in _process_executor_events.
   This PR fixes it.
   
   The error:
   ```
   [2021-10-14 08:37:18,377: INFO/ForkPoolWorker-1] Celery task ID: 
6e8e34d/persistence.py", line 995, in _emit_update_statements
       statement, multiparams
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
       distilled_params,
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1257, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
 line 912, in do_executemany
       cursor.executemany(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 35678 waits for ShareLock on transaction 150292; blocked by 
process 58800.
   Process 58800 waits for ShareLock on transaction 150291; blocked by process 
58799.
   Process 58799 waits for ShareLock on transaction 150289; blocked by process 
35678.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (11,54) in relation "task_instance"
   
   [SQL: UPDATE task_instance SET external_executor_id=%(external_executor_id)s 
WHERE task_instance.task_id = %(task_instance_task_id)s AND 
task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = 
%(task_instance_run_id)s]
   [parameters: ({'external_executor_id': 
'fe1c200a-ca4a-49b2-b5f9-ea53d5ed0846', 'task_instance_task_id': 
'test_sensor_15', 'task_instance_dag_id': 'airflow_bug', 
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'}, 
{'external_executor_id': '1ee3e79a-ceab-4215-94cb-100d8de73bad', 
'task_instance_task_id': 'test_sensor_22', 'task_instance_dag_id': 
'airflow_bug', 'task_instance_run_id': 
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id': 
'8873162e-519f-4ef2-890c-d38754760c81', 'task_instance_task_id': 
'test_sensor_24', 'task_instance_dag_id': 'airflow_bug', 
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'}, 
{'external_executor_id': '52d05186-5297-4079-b94a-132bef845673', 
'task_instance_task_id': 'test_sensor_27', 'task_instance_dag_id': 
'airflow_bug', 'task_instance_run_id': 
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id': 
'1e47d3a0-4398-4f07-9ebd-73e16ffa02ec', 'task_instance_task_id': 
'test_sensor_28', 'task_ins
 tance_dag_id': 'airflow_bug', 'task_instance_run_id': 
'manual__2021-10-14T08:17:18.387618+00:00'}, {'external_executor_id': 
'1c89c6e0-8cb6-437b-9ee5-56968bb993fc', 'task_instance_task_id': 
'test_sensor_35', 'task_instance_dag_id': 'airflow_bug', 
'task_instance_run_id': 'manual__2021-10-14T08:17:18.387618+00:00'})]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   
   I tested the fix with 3 schedulers and it works.
   
   How to reproduce:
   Using 2 schedulers and celery executor, run this dag:
   [dags.zip](https://github.com/apache/airflow/files/7345217/dags.zip)
   
   One scheduler will stop with errors like the above after some time.
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to