easontm opened a new issue #16982:
URL: https://github.com/apache/airflow/issues/16982
<!--
Welcome to Apache Airflow! For a smooth issue process, try to answer the
following questions.
Don't worry if they're not all applicable; just try to include what you can
:-)
If you need to include code snippets or logs, please put them in fenced code
blocks. If they're super-long, please use the details tag like
<details><summary>super-long log</summary> lots of stuff </details>
Please delete these comment blocks before submitting the issue.
-->
**Apache Airflow version**: 2.1.1
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`): 1.18
**Environment**:
- **Cloud provider or hardware configuration**: AWS hosting a Kube cluster
- **OS**: Ubuntu 19.10
- **Kernel**: 4.14.225-169.362.amzn2.x86_64
- **Install tools**:
- **Others**: MySQL 8.0.23 on RDS
**What happened**:
In an unpredictable fashion, some tasks are unable to start. They do not
retry and they do not write to the shared log directory, but if I run `kubectl
logs <worker pod>` while it sits in Error state afterward, I can see:
```
[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from
/usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]>
on host <WORKER POD>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line
259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded;
try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line
40, in main
args.func(args)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line
91, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py",
line 237, in run
self._execute()
File
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line
96, in _execute
pool=self.pool,
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
1023, in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py",
line 67, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
623, in refresh_from_db
ti = qry.with_for_update().first()
<SQLALCHEMY TRACE OMITTED FOR BREVITY>
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line
259, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError)
(1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
task_instance_dag_id, task_instance.execution_date AS
task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration, task_instance.state AS
task_instance_state, task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
task_instance_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
task_instance_pool_slots, task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS
task_instance_queued_dttm, task_instance.queued_by_job_id AS
task_instance_queued_by_job_id,
task_instance.pid AS task_instance_pid, task_instance.executor_config AS
task_instance_executor_config, task_instance.external_executor_id AS
task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND
task_instance.execution_date = %s
LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30),
1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
```
<details>
<summary>Full length log</summary>
```
[2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from
/usr/local/airflow/dags/foo/bar.py
Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]>
on host <WORKER POD>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line
259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded;
try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line
40, in main
args.func(args)
File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line
91, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py",
line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py",
line 237, in run
self._execute()
File
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line
96, in _execute
pool=self.pool,
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
1023, in check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py",
line 67, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line
623, in refresh_from_db
ti = qry.with_for_update().first()
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py",
line 3429, in first
ret = list(self[0:1])
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py",
line 3203, in __getitem__
return list(res)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py",
line 3535, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py",
line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line
608, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
206, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line
319, in _query
db.query(q)
File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line
259, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError)
(1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
task_instance_dag_id, task_instance.execution_date AS
task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration, task_instance.state AS
task_instance_state, task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
task_instance_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
task_instance_pool_slots, task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS
task_instance_queued_dttm, task_instance.queued_by_job_id AS
task_instance_queued_by_job_id,
task_instance.pid AS task_instance_pid, task_instance.executor_config AS
task_instance_executor_config, task_instance.external_executor_id AS
task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND
task_instance.execution_date = %s
LIMIT %s FOR UPDATE]
[parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30),
1)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
```
</details>
Afterward, the task is marked as Failed. The issue is transient, and tasks
can be manually rerun to try again.
**What you expected to happen**:
If a lock cannot be obtained, it should exit more gracefully and reschedule.
**How to reproduce it**:
You can trigger the non-graceful task failure by manually locking the row
and then trying to run the task -- it should work on any task.
1. Connect to the MySQL instance backing Airflow
2. `SET autocommit = OFF;`
3. `START TRANSACTION;`
4. Lock the row
```
SELECT task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
task_instance_dag_id, task_instance.execution_date AS
task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date,
task_instance.duration AS task_instance_duration, task_instance.state AS
task_instance_state, task_instance.max_tries AS task_instance_max_tries,
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS
task_instance_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
task_instance_pool_slots, task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS
task_instance_queued_dttm, task_instance.queued_by_job_id AS
task_instance_queued_by_job_id, task_i
nstance.pid AS task_instance_pid, task_instance.executor_config AS
task_instance_executor_config, task_instance.external_executor_id AS
task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = 'foobar'
AND task_instance.task_id = 'my_task'
AND task_instance.execution_date = '2021-07-12 00:00:00.000000'
LIMIT 1 FOR UPDATE;
```
5. Try to run the task via the UI.
**Anything else we need to know**:
Ideally deadlock doesn't ever occur and the task executes normally, however
the deadlocks are seemingly random and I cannot replicate them. I hypothesized
that somehow the scheduler was spinning up two worker pods at the same time,
but if that were the case I would see two dead workers in `Error` state by
performing `kubectl get pods`. Deadlock itself seems to occur on <1% of tasks,
but it seems that deadlock itself consistently fails the task without retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]