dstaple commented on issue #23361:
URL: https://github.com/apache/airflow/issues/23361#issuecomment-1183497072

   We have figured out the origins of the SELECT ... FOR UPDATE and a mechanism 
for the deadlocks.
   
   The short story is it originates from the `airflow run task` cli command 
inside task pods.
   
   The SELECT does indeed originate from `TaskInstance.refresh_from_db()` as 
suggested above. It is called as follows:
   ```
   airflow/jobs/local_task_job.py:89 _execute
       airflow/models/taskinstance.py:1184: 
check_and_change_state_before_execution
           airflow/models/taskinstance.py:714 
refresh_from_db(lock_for_update=True, session=session)
   
   ```
   Line numbers both in the synopsis above and the stack trace below are for 
Airflow 2.2.5.
   
   Stack traces including the SELECT statements can be found in failed pod 
logs, I have included one below:
   ```
   Traceback (most recent call last):
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
   psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, 
commands ignored until end of transaction block
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 298, in task_run
       self._execute()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", 
line 70, in wrapper
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1184, in check_and_change_state_before_execution
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", 
line 67, in wrapper
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 734, in refresh_from_db
       for attempt in run_with_db_retries(logger=self.log):
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 
390, in __iter__
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 
368, in iter
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 
186, in reraise
     File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 
428, in result
       return self.__get_result()
     File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 
384, in __get_result
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 736, in refresh_from_db
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3203, in __getitem__
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3560, in _execute_and_instances
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 609, in do_execute
   sqlalchemy.exc.InternalError: (psycopg2.errors.InFailedSqlTransaction) 
current transaction is aborted, commands ignored until end of transaction block
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.run_id AS task_instance_run_id, 
task_instance.start_date AS task_instance_start_date, task_instance.end_date AS 
task_instance_end_date, task_instance.duration AS task_instance_duration, 
task_instance.state AS task_instance_state, task_instance.max_tries AS 
task_instance_max_tries, task_instance.hostname AS task_instance_hostname, 
task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
task_instance_job_id, task_instance.pool AS task_instance_pool, 
task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS 
task_instance_queue, task_instance.priority_weight AS 
task_instance_priority_weight, task_instance.operator AS 
task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, 
task_instance.queued_by_job_id AS task_instance_queued_by_job_id, 
task_instance.pi
 d AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id, task_instance.trigger_id AS 
task_instance_trigger_id, task_instance.trigger_timeout AS 
task_instance_trigger_timeout, task_instance.next_method AS 
task_instance_next_method, task_instance.next_kwargs AS 
task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, 
[dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS 
dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, 
dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS 
dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, 
dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS 
dag_run_1_creating_job_id, dag_run_1.external_trigger AS 
dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, 
dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS 
dag_run_1_data_interval_start,
  dag_run_1.data_interval_end AS dag_run_1_data_interval_end, 
dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, 
dag_run_1.dag_hash AS dag_run_1_dag_hash
   WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = 
%(task_id_1)s AND task_instance.run_id = %(run_id_1)s
    LIMIT %(param_1)s FOR UPDATE]
   [parameters: {'dag_id_1': 'sanitized_dag_name_1', 'task_id_1': 
'sanitized_task_name_1', 'run_id_1': 'sanitized_run_id_1', 'param_1': 1}]
   (Background on this error at: http://sqlalche.me/e/13/2j85)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py",
 line 609, in do_execute
       cursor.execute(statement, parameters)
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 
48, in main
       args.func(args)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
       _run_task_by_selected_method(args, dag, ti)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
 line 163, in _run_task_by_local_task_job
       run_job.run()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", 
line 246, in run
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/local_task_job.py",
 line 97, in _execute
       external_executor_id=self.external_executor_id,
       return func(*args, session=session, **kwargs)
       self.refresh_from_db(session=session, lock_for_update=True)
       return func(*args, **kwargs)
       do = self.iter(retry_state=retry_state)
       raise retry_exc.reraise()
       raise self.last_attempt.result()
       raise self._exception
       ti: Optional[TaskInstance] = qry.with_for_update().first()
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3429, in first
       ret = list(self[0:1])
       return list(res)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3535, in __iter__
       return self._execute_and_instances(context)
       result = conn.execute(querycontext.statement, self._params)
       return meth(self, multiparams, params)
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
       distilled_params,
       e, statement, parameters, cursor, context
     File 
"/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
       raise exception
       cursor, statement, parameters, context
       cursor.execute(statement, parameters)
   
   FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = 
task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to