nehemiascr commented on issue #19957:
URL: https://github.com/apache/airflow/issues/19957#issuecomment-1593263561
I am experiencing this issue in 2.5.1 when I try to change the state of 100
task instances of a dag run through the REST API, I am calling the
`api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}` API
concurrently and then it happens:
```
[SQL: SELECT task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
task_instance_dag_id, task_instance.run_id AS task_instance_run_id,
task_instance.map_index AS task_instance_map_index, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration, task_instance.state AS
task_instance_state, task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
task_instance_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
task_instance_pool_slots, task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS
task_instance_queued_dttm, task_instance.queued_by_job_id
AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid,
task_instance.executor_config AS task_instance_executor_config,
task_instance.updated_at AS task_instance_updated_at,
task_instance.external_executor_id AS task_instance_external_executor_id,
task_instance.trigger_id AS task_instance_trigger_id,
task_instance.trigger_timeout AS task_instance_trigger_timeout,
task_instance.next_method AS task_instance_next_method,
task_instance.next_kwargs AS task_instance_next_kwargs
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id IN
(%(run_id_1_1)s) AND task_instance.task_id IN (%(task_id_1_1)s,
%(task_id_1_2)s, %(task_id_1_3)s, %(task_id_1_4)s, %(task_id_1_5)s,
%(task_id_1_6)s, %(task_id_1_7)s, %(task_id_1_8)s, %(task_id_1_9)s,
%(task_id_1_10)s, %(task_id_1_11)s, %(task_id_1_12)s, %(task_id_1_13)s,
%(task_id_1_14)s, %(task_id_1_15)s, %(task_id_1_16)s, %(task_id_1_17)s,
%(task_id_1_18)s, %(task_id_1_19)s, %(task_id_1_20)s, %(task_id_1_21)s,
%(task_id_1_22)s, %(task_id_1_23)s, %(task_id_1_24)s, %(task_id_1_25)s,
%(task_id_1_26)s, %(task_id_1_27)s, %(task_id_1_28)s, %(task_id_1_29)s,
%(task_id_1_30)s, %(task_id_1_31)s, %(task_id_1_32)s, %(task_id_1_33)s,
%(task_id_1_34)s, %(task_id_1_35)s, %(task_id_1_36)s, %(task_id_1_37)s,
%(task_id_1_38)s, %(task_id_1_39)s, %(task_id_1_40)s, %(task_id_1_41)s,
%(task_id_1_42)s, %(task_id_1_43)s, %(task_id_1_44)s, %(task_id_1_45)s,
%(task_id_1_46)s, %(task_id_1_47)s, %(task_id_1_48)s, %(task_id_1_49)s, %(tas
k_id_1_50)s, %(task_id_1_51)s, %(task_id_1_52)s, %(task_id_1_53)s) AND
(task_instance.state IS NULL OR task_instance.state != %(state_1)s) FOR UPDATE]
[parameters: {'dag_id_1': 'publish_pipeline', 'state_1': 'failed',
'run_id_1_1': 'api_2023-06-15T14:50:02.094531-db-857', 'task_id_1_1':
'agg_historical_weekly_rollup_29', 'task_id_1_2':
'agg_historical_weekly_calculate_metrics_49', 'task_id_1_3':
'agg_historical_weekly_rollup_50', 'task_id_1_4': 'pipeline_end',
'task_id_1_5': 'agg_historical_weekly_rollup_37', 'task_id_1_6':
'agg_historical_weekly_rollup_42', 'task_id_1_7':
'agg_historical_weekly_calculate_metrics_40', 'task_id_1_8':
'agg_historical_weekly_calculate_metrics_34', 'task_id_1_9':
'agg_historical_weekly_rollup_30', 'task_id_1_10':
'agg_historical_weekly_rollup_43', 'task_id_1_11':
'agg_historical_weekly_rollup_41', 'task_id_1_12': 'publish_job_log_finalizer',
'task_id_1_13': 'agg_historical_weekly_calculate_metrics_33', 'task_id_1_14':
'agg_historical_weekly_calculate_metrics_46', 'task_id_1_15':
'agg_historical_weekly_rollup_47', 'task_id_1_16':
'agg_historical_weekly_calculate_metrics_42', 'task_id_1_17': 'agg_hist
orical_weekly_rollup_51', 'task_id_1_18':
'agg_historical_weekly_calculate_metrics_39', 'task_id_1_19':
'agg_historical_weekly_rollup_36', 'task_id_1_20':
'agg_historical_weekly_rollup_45', 'task_id_1_21':
'agg_historical_weekly_rollup_48', 'task_id_1_22': 'wrap_up_publish',
'task_id_1_23': 'agg_historical_weekly_rollup_32', 'task_id_1_24':
'agg_historical_weekly_rollup_34', 'task_id_1_25':
'agg_historical_weekly_rollup_33', 'task_id_1_26':
'agg_historical_weekly_rollup_46', 'task_id_1_27':
'agg_historical_weekly_rollup_49', 'task_id_1_28':
'agg_historical_weekly_calculate_metrics_45', 'task_id_1_29':
'agg_historical_weekly_rollup_39', 'task_id_1_30': 'send_email_on_fail',
'task_id_1_31': 'agg_historical_weekly_rollup_35', 'task_id_1_32':
'agg_historical_weekly_rollup_38', 'task_id_1_33':
'agg_historical_weekly_calculate_metrics_43', 'task_id_1_34':
'agg_historical_weekly_calculate_metrics_36', 'task_id_1_35':
'agg_historical_weekly_calculate_metrics_51', 'task_id_1_36': 'db_quality
_check', 'task_id_1_37': 'agg_historical_weekly_calculate_metrics_37',
'task_id_1_38': 'agg_historical_weekly_calculate_metrics_32', 'task_id_1_39':
'agg_historical_weekly_calculate_metrics_30', 'task_id_1_40':
'agg_historical_weekly_calculate_metrics_44', 'task_id_1_41':
'agg_historical_weekly_calculate_metrics_50', 'task_id_1_42':
'agg_historical_weekly_calculate_metrics_48', 'task_id_1_43':
'agg_historical_weekly_calculate_metrics_41', 'task_id_1_44':
'agg_historical_weekly_calculate_metrics_35', 'task_id_1_45':
'agg_historical_weekly_calculate_metrics_38', 'task_id_1_46':
'agg_historical_weekly_rollup_31', 'task_id_1_47':
'agg_historical_weekly_rollup_40', 'task_id_1_48':
'agg_historical_weekly_rollup_44', 'task_id_1_49':
'agg_historical_weekly_calculate_metrics_52', 'task_id_1_50':
'agg_historical_weekly_calculate_metrics_31', 'task_id_1_51':
'agg_historical_weekly_rollup_52', 'task_id_1_52':
'historical_weekly_aggregations_done', 'task_id_1_53':
'agg_historical_weekly_calculat
e_metrics_47'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.131 - airflowdags [15/Jun/2023:15:01:02 +0000] "POST
/api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-"
"python-httpx/0.24.1"
[[34m2023-06-15 15:01:02,879[0m] {[34mmanager.py:[0m226} INFO[0m -
Updated user admin admin[0m
[[34m2023-06-15 15:01:03,842[0m] {[34mapp.py:[0m1741} ERROR[0m -
Exception on /api/v1/dags/publish_pipeline/updateTaskInstancesState [POST][0m
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1802, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 719, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 19389 waits for ShareLock on transaction 34948119; blocked
by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by
process 19389.
HINT: See server log for query details.
CONTEXT: while updating tuple (1379,12) in relation "dag_run"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
2525, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1822, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1820, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line
1796, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/decorator.py",
line 68, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/uri_parsing.py",
line 149, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
line 196, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py",
line 399, in wrapper
return function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/response.py",
line 112, in wrapper
response = function(request)
File
"/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/parameter.py",
line 120, in wrapper
return function(**kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/security.py",
line 51, in decorated
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py",
line 539, in post_set_task_instances_state
tis = dag.set_task_instance_state(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line
1888, in set_task_instance_state
subdag.clear(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py",
line 72, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line
2051, in clear
clear_task_instances(
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 281, in clear_task_instances
session.flush()
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3345, in flush
self._flush(objects)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3485, in _flush
transaction.rollback(_capture_exception=True)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3445, in _flush
flush_context.execute()
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
rec.execute(self)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 236, in save_obj
_emit_update_statements(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 1000, in _emit_update_statements
c = connection._execute_20(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1614, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py",
line 325, in _execute_on_connection
return connection._execute_clauseelement(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1481, in _execute_clauseelement
ret = self._execute_context(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1845, in _execute_context
self._handle_dbapi_exception(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2026, in _handle_dbapi_exception
util.raise_(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1802, in _execute_context
self.dialect.do_execute(
File
"/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 719, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock
detected
DETAIL: Process 19389 waits for ShareLock on transaction 34948119; blocked
by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by
process 19389.
HINT: See server log for query details.
CONTEXT: while updating tuple (1379,12) in relation "dag_run"
[SQL: UPDATE dag_run SET queued_at=%(queued_at)s, start_date=%(start_date)s,
state=%(state)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
[parameters: {'queued_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 833204,
tzinfo=Timezone('UTC')), 'start_date': None, 'state': <DagRunState.QUEUED:
'queued'>, 'updated_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 837800,
tzinfo=Timezone('UTC')), 'dag_run_id': 25068}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.185 - airflowdags [15/Jun/2023:15:01:03 +0000] "POST
/api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-"
"python-httpx/0.24.1"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]