stablum opened a new issue #19957:
URL: https://github.com/apache/airflow/issues/19957


   ### Apache Airflow version
   
   2.2.2 (latest released)
   
   ### Operating System
   
   Ubuntu 21.04 on a VM
   
   ### Versions of Apache Airflow Providers
   
   root@AI-Research:~/learning_sets/airflow# pip freeze | grep 
apache-airflow-providers
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-http==2.0.1
   apache-airflow-providers-imap==2.0.1
   apache-airflow-providers-sqlite==2.0.1
   
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Airflow is at version 2.2.2
   psql (PostgreSQL) 13.5 (Ubuntu 13.5-0ubuntu0.21.04.1)
   
   The dag contains thousands of tasks for data download and preprocessing and 
preparation which is destined to a mongodb database (so, I'm not using the 
PostgreSQL inside my tasks).
   
   ### What happened
   
   [2021-12-01 19:41:57,556] {scheduler_job.py:644} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 322086 waits for ShareLock on transaction 2391367; blocked 
by process 340345.
   Process 340345 waits for AccessExclusiveLock on tuple (0,26) of relation 
19255 of database 19096; blocked by process 340300.
   Process 340300 waits for ShareLock on transaction 2391361; blocked by 
process 322086.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1335,10) in relation "task_instance"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 
628, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 
709, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 
792, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File 
"/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 
1049, in _schedule_dag_run
       dag_run.schedule_tis(schedulable_tis, session)
     File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", 
line 67, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", 
line 898, in schedule_tis
       session.query(TI)
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", 
line 4063, in update
       update_op.exec_()
     File 
"/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 
1697, in exec_
       self._do_exec()
     File 
"/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 
1895, in _do_exec
       self._execute_stmt(update_stmt)
     File 
"/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 
1702, in _execute_stmt
       self.result = self.query._execute_crud(stmt, self.mapper)
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", 
line 3568, in _execute_crud
       return conn.execute(stmt, self._params)
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1124, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1316, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1510, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", 
line 1276, in _execute_context
       self.dialect.do_execute(
     File 
"/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 322086 waits for ShareLock on transaction 2391367; blocked 
by process 340345.
   Process 340345 waits for AccessExclusiveLock on tuple (0,26) of relation 
19255 of database 19096; blocked by process 340300.
   Process 340300 waits for ShareLock on transaction 2391361; blocked by 
process 322086.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1335,10) in relation "task_instance"
   
   [SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = 
%(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id 
IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, 
%(task_id_6)s, %(task_id_7)s, %(task_id_8)s, %(task_id_9)s, %(task_id_10)s, 
%(task_id_11)s, %(task_id_12)s, %(task_id_13)s, %(task_id_14)s, %(task_id_15)s, 
%(task_id_16)s, %(task_id_17)s, %(task_id_18)s, %(task_id_19)s, %(task_id_20)s)]
   [parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 
'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 
'manual__2021-12-01T17:31:23.684597+00:00', 'task_id_1': 'download_1379', 
'task_id_2': 'download_1438', 'task_id_3': 'download_1363', 'task_id_4': 
'download_1368', 'task_id_5': 'download_138', 'task_id_6': 'download_1432', 
'task_id_7': 'download_1435', 'task_id_8': 'download_1437', 'task_id_9': 
'download_1439', 'task_id_10': 'download_1457', 'task_id_11': 'download_168', 
'task_id_12': 'download_203', 'task_id_13': 'download_782', 'task_id_14': 
'download_1430', 'task_id_15': 'download_1431', 'task_id_16': 'download_1436', 
'task_id_17': 'download_167', 'task_id_18': 'download_174', 'task_id_19': 
'download_205', 'task_id_20': 'download_1434'}]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   [2021-12-01 19:41:57,566] {local_executor.py:388} INFO - Shutting down 
LocalExecutor; waiting for running tasks to finish.  Signal again if you don't 
want to wait.
   [2021-12-01 19:42:18,013] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 285470
   [2021-12-01 19:42:18,105] {process_utils.py:66} INFO - Process 
psutil.Process(pid=285470, status='terminated', exitcode=0, started='18:56:21') 
(285470) terminated with exit code 0
   [2021-12-01 19:42:18,106] {scheduler_job.py:655} INFO - Exited execute loop
   
   
   ### What you expected to happen
   
   Maybe 24 concurrent processes/tasks are too many?
   
   ### How to reproduce
   
   reproducibility is challenging, but maybe the exception provides enough info 
for a fix
   
   ### Anything else
   
   all the time, after some time the dag is being run
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to