nehemiascr commented on issue #19957:
URL: https://github.com/apache/airflow/issues/19957#issuecomment-1593263561

   I am experiencing this issue in 2.5.1 when I try to change the state of 100 
task instances of a dag run through the REST API, I am calling the 
`api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` API 
concurrently and then it happens:
   ```
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.run_id AS task_instance_run_id, 
task_instance.map_index AS task_instance_map_index, task_instance.start_date AS 
task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
task_instance.duration AS task_instance_duration, task_instance.state AS 
task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id
  AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, 
task_instance.executor_config AS task_instance_executor_config, 
task_instance.updated_at AS task_instance_updated_at, 
task_instance.external_executor_id AS task_instance_external_executor_id, 
task_instance.trigger_id AS task_instance_trigger_id, 
task_instance.trigger_timeout AS task_instance_trigger_timeout, 
task_instance.next_method AS task_instance_next_method, 
task_instance.next_kwargs AS task_instance_next_kwargs
   FROM task_instance
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id IN 
(%(run_id_1_1)s) AND task_instance.task_id IN (%(task_id_1_1)s, 
%(task_id_1_2)s, %(task_id_1_3)s, %(task_id_1_4)s, %(task_id_1_5)s, 
%(task_id_1_6)s, %(task_id_1_7)s, %(task_id_1_8)s, %(task_id_1_9)s, 
%(task_id_1_10)s, %(task_id_1_11)s, %(task_id_1_12)s, %(task_id_1_13)s, 
%(task_id_1_14)s, %(task_id_1_15)s, %(task_id_1_16)s, %(task_id_1_17)s, 
%(task_id_1_18)s, %(task_id_1_19)s, %(task_id_1_20)s, %(task_id_1_21)s, 
%(task_id_1_22)s, %(task_id_1_23)s, %(task_id_1_24)s, %(task_id_1_25)s, 
%(task_id_1_26)s, %(task_id_1_27)s, %(task_id_1_28)s, %(task_id_1_29)s, 
%(task_id_1_30)s, %(task_id_1_31)s, %(task_id_1_32)s, %(task_id_1_33)s, 
%(task_id_1_34)s, %(task_id_1_35)s, %(task_id_1_36)s, %(task_id_1_37)s, 
%(task_id_1_38)s, %(task_id_1_39)s, %(task_id_1_40)s, %(task_id_1_41)s, 
%(task_id_1_42)s, %(task_id_1_43)s, %(task_id_1_44)s, %(task_id_1_45)s, 
%(task_id_1_46)s, %(task_id_1_47)s, %(task_id_1_48)s, %(task_id_1_49)s, %(tas
 k_id_1_50)s, %(task_id_1_51)s, %(task_id_1_52)s, %(task_id_1_53)s) AND 
(task_instance.state IS NULL OR task_instance.state != %(state_1)s) FOR UPDATE]
   [parameters: {'dag_id_1': 'publish_pipeline', 'state_1': 'failed', 
'run_id_1_1': 'api_2023-06-15T14:50:02.094531-db-857', 'task_id_1_1': 
'agg_historical_weekly_rollup_29', 'task_id_1_2': 
'agg_historical_weekly_calculate_metrics_49', 'task_id_1_3': 
'agg_historical_weekly_rollup_50', 'task_id_1_4': 'pipeline_end', 
'task_id_1_5': 'agg_historical_weekly_rollup_37', 'task_id_1_6': 
'agg_historical_weekly_rollup_42', 'task_id_1_7': 
'agg_historical_weekly_calculate_metrics_40', 'task_id_1_8': 
'agg_historical_weekly_calculate_metrics_34', 'task_id_1_9': 
'agg_historical_weekly_rollup_30', 'task_id_1_10': 
'agg_historical_weekly_rollup_43', 'task_id_1_11': 
'agg_historical_weekly_rollup_41', 'task_id_1_12': 'publish_job_log_finalizer', 
'task_id_1_13': 'agg_historical_weekly_calculate_metrics_33', 'task_id_1_14': 
'agg_historical_weekly_calculate_metrics_46', 'task_id_1_15': 
'agg_historical_weekly_rollup_47', 'task_id_1_16': 
'agg_historical_weekly_calculate_metrics_42', 'task_id_1_17': 'agg_hist
 orical_weekly_rollup_51', 'task_id_1_18': 
'agg_historical_weekly_calculate_metrics_39', 'task_id_1_19': 
'agg_historical_weekly_rollup_36', 'task_id_1_20': 
'agg_historical_weekly_rollup_45', 'task_id_1_21': 
'agg_historical_weekly_rollup_48', 'task_id_1_22': 'wrap_up_publish', 
'task_id_1_23': 'agg_historical_weekly_rollup_32', 'task_id_1_24': 
'agg_historical_weekly_rollup_34', 'task_id_1_25': 
'agg_historical_weekly_rollup_33', 'task_id_1_26': 
'agg_historical_weekly_rollup_46', 'task_id_1_27': 
'agg_historical_weekly_rollup_49', 'task_id_1_28': 
'agg_historical_weekly_calculate_metrics_45', 'task_id_1_29': 
'agg_historical_weekly_rollup_39', 'task_id_1_30': 'send_email_on_fail', 
'task_id_1_31': 'agg_historical_weekly_rollup_35', 'task_id_1_32': 
'agg_historical_weekly_rollup_38', 'task_id_1_33': 
'agg_historical_weekly_calculate_metrics_43', 'task_id_1_34': 
'agg_historical_weekly_calculate_metrics_36', 'task_id_1_35': 
'agg_historical_weekly_calculate_metrics_51', 'task_id_1_36': 'db_quality
 _check', 'task_id_1_37': 'agg_historical_weekly_calculate_metrics_37', 
'task_id_1_38': 'agg_historical_weekly_calculate_metrics_32', 'task_id_1_39': 
'agg_historical_weekly_calculate_metrics_30', 'task_id_1_40': 
'agg_historical_weekly_calculate_metrics_44', 'task_id_1_41': 
'agg_historical_weekly_calculate_metrics_50', 'task_id_1_42': 
'agg_historical_weekly_calculate_metrics_48', 'task_id_1_43': 
'agg_historical_weekly_calculate_metrics_41', 'task_id_1_44': 
'agg_historical_weekly_calculate_metrics_35', 'task_id_1_45': 
'agg_historical_weekly_calculate_metrics_38', 'task_id_1_46': 
'agg_historical_weekly_rollup_31', 'task_id_1_47': 
'agg_historical_weekly_rollup_40', 'task_id_1_48': 
'agg_historical_weekly_rollup_44', 'task_id_1_49': 
'agg_historical_weekly_calculate_metrics_52', 'task_id_1_50': 
'agg_historical_weekly_calculate_metrics_31', 'task_id_1_51': 
'agg_historical_weekly_rollup_52', 'task_id_1_52': 
'historical_weekly_aggregations_done', 'task_id_1_53': 
'agg_historical_weekly_calculat
 e_metrics_47'}]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   10.41.6.131 - airflowdags [15/Jun/2023:15:01:02 +0000] "POST 
/api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" 
"python-httpx/0.24.1"
   [2023-06-15 15:01:02,879] {manager.py:226} INFO - 
Updated user admin admin
   [2023-06-15 15:01:03,842] {app.py:1741} ERROR - 
Exception on /api/v1/dags/publish_pipeline/updateTaskInstancesState [POST]
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1802, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked 
by process 19388.
   Process 19388 waits for ShareLock on transaction 34948109; blocked by 
process 19389.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1379,12) in relation "dag_run"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
2525, in wsgi_app
       response = self.full_dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1822, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1820, in full_dispatch_request
       rv = self.dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1796, in dispatch_request
       return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/decorator.py",
 line 68, in wrapper
       response = function(request)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/uri_parsing.py",
 line 149, in wrapper
       response = function(request)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
 line 196, in wrapper
       response = function(request)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
 line 399, in wrapper
       return function(request)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/response.py",
 line 112, in wrapper
       response = function(request)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/parameter.py",
 line 120, in wrapper
       return function(**kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/security.py",
 line 51, in decorated
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py",
 line 539, in post_set_task_instances_state
       tis = dag.set_task_instance_state(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
1888, in set_task_instance_state
       subdag.clear(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 72, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 
2051, in clear
       clear_task_instances(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 281, in clear_task_instances
       session.flush()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3345, in flush
       self._flush(objects)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3485, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3445, in _flush
       flush_context.execute()
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
       rec.execute(self)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 236, in save_obj
       _emit_update_statements(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 1000, in _emit_update_statements
       c = connection._execute_20(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1614, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", 
line 325, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1481, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1845, in _execute_context
       self._handle_dbapi_exception(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 2026, in _handle_dbapi_exception
       util.raise_(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 207, in raise_
       raise exception
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1802, in _execute_context
       self.dialect.do_execute(
     File 
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 719, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock 
detected
   DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked 
by process 19388.
   Process 19388 waits for ShareLock on transaction 34948109; blocked by 
process 19389.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1379,12) in relation "dag_run"
   
   [SQL: UPDATE dag_run SET queued_at=%(queued_at)s, start_date=%(start_date)s, 
state=%(state)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
   [parameters: {'queued_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 833204, 
tzinfo=Timezone('UTC')), 'start_date': None, 'state': <DagRunState.QUEUED: 
'queued'>, 'updated_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 837800, 
tzinfo=Timezone('UTC')), 'dag_run_id': 25068}]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   10.41.6.185 - airflowdags [15/Jun/2023:15:01:03 +0000] "POST 
/api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" 
"python-httpx/0.24.1"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to