dstaple commented on issue #23361: URL: https://github.com/apache/airflow/issues/23361#issuecomment-1183497072
We have figured out the origins of the SELECT ... FOR UPDATE and a mechanism for the deadlocks. The short story is it originates from the `airflow run task` cli command inside task pods. The SELECT does indeed originate from `TaskInstance.refresh_from_db()` as suggested above. It is called as follows: ``` airflow/jobs/local_task_job.py:89 _execute airflow/models/taskinstance.py:1184: check_and_change_state_before_execution airflow/models/taskinstance.py:714 refresh_from_db(lock_for_update=True, session=session) ``` Line numbers both in the synopsis above and the stack trace below are for Airflow 2.2.5. Stack traces including the SELECT statements can be found in failed pod logs, I have included one below: ``` Traceback (most recent call last): File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context cursor, statement, parameters, context psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 298, in task_run self._execute() File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1184, in check_and_change_state_before_execution File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 734, in refresh_from_db for attempt in run_with_db_retries(logger=self.log): File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 390, in __iter__ File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 368, in iter File "/opt/airflow/venv-py3/lib/python3.7/site-packages/tenacity/__init__.py", line 186, in reraise File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 428, in result return self.__get_result() File "/opt/python3.7/lib64/python3.7/concurrent/futures/_base.py", line 384, in __get_result File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 736, in refresh_from_db File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__ File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute sqlalchemy.exc.InternalError: (psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pi d AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.run_id = %(run_id_1)s LIMIT %(param_1)s FOR UPDATE] [parameters: {'dag_id_1': 'sanitized_dag_name_1', 'task_id_1': 'sanitized_task_name_1', 'run_id_1': 'sanitized_run_id_1', 'param_1': 1}] (Background on this error at: http://sqlalche.me/e/13/2j85) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute cursor.execute(statement, parameters) The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module> sys.exit(main()) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main args.func(args) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper return f(*args, **kwargs) _run_task_by_selected_method(args, dag, ti) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job run_job.run() File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 97, in _execute external_executor_id=self.external_executor_id, return func(*args, session=session, **kwargs) self.refresh_from_db(session=session, lock_for_update=True) return func(*args, **kwargs) do = self.iter(retry_state=retry_state) raise retry_exc.reraise() raise self.last_attempt.result() raise self._exception ti: Optional[TaskInstance] = qry.with_for_update().first() File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3429, in first ret = list(self[0:1]) return list(res) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__ return self._execute_and_instances(context) result = conn.execute(querycontext.statement, self._params) return meth(self, multiparams, params) File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) distilled_params, e, statement, parameters, cursor, context File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info[2], from_=e raise exception cursor, statement, parameters, context cursor.execute(statement, parameters) FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
