easontm opened a new issue #16982:
URL: https://github.com/apache/airflow/issues/16982


   <!--
   
   Welcome to Apache Airflow!  For a smooth issue process, try to answer the 
following questions.
   Don't worry if they're not all applicable; just try to include what you can 
:-)
   
   If you need to include code snippets or logs, please put them in fenced code
   blocks.  If they're super-long, please use the details tag like
   <details><summary>super-long log</summary> lots of stuff </details>
   
   Please delete these comment blocks before submitting the issue.
   
   -->
   
   **Apache Airflow version**: 2.1.1
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): 1.18
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS hosting a Kube cluster
   - **OS**: Ubuntu 19.10
   - **Kernel**: 4.14.225-169.362.amzn2.x86_64
   - **Install tools**:
   - **Others**: MySQL 8.0.23 on RDS
   
   **What happened**:
   
   In an unpredictable fashion, some tasks are unable to start. They do not 
retry and they do not write to the shared log directory, but if I run `kubectl 
logs <worker pod>` while it sits in Error state afterward, I can see:
   ```
   [2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/foo/bar.py
   Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> 
on host <WORKER POD>
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 
259, in query
       _mysql.connection.query(self, query)
   MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; 
try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 
40, in main
       args.func(args)
     File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 
91, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", 
line 237, in run
       self._execute()
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 
96, in _execute
       pool=self.pool,
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
1023, in check_and_change_state_before_execution
       self.refresh_from_db(session=session, lock_for_update=True)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", 
line 67, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
623, in refresh_from_db
       ti = qry.with_for_update().first()
    
   <SQLALCHEMY TRACE OMITTED FOR BREVITY>
   
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 
259, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1205, 'Lock wait timeout exceeded; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.execution_date AS 
task_instance_execution_date, task_instance.start_date AS 
task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
task_instance.duration AS task_instance_duration, task_instance.state AS 
task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id AS 
task_instance_queued_by_job_id, 
 task_instance.pid AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id
   FROM task_instance
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND 
task_instance.execution_date = %s
    LIMIT %s FOR UPDATE]
   [parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 
1)]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   
   <details>
   <summary>Full length log</summary>
   
   
   ```
   [2021-07-12 23:30:21,713] {dagbag.py:496} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/foo/bar.py
   Running <TaskInstance: foo_bar.my_task 2021-07-12T22:30:00+00:00 [queued]> 
on host <WORKER POD>
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 
259, in query
       _mysql.connection.query(self, query)
   MySQLdb._exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; 
try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.7/dist-packages/airflow/__main__.py", line 
40, in main
       args.func(args)
     File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 
91, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 238, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 64, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 121, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", 
line 237, in run
       self._execute()
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 
96, in _execute
       pool=self.pool,
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
1023, in check_and_change_state_before_execution
       self.refresh_from_db(session=session, lock_for_update=True)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", 
line 67, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 
623, in refresh_from_db
       ti = qry.with_for_update().first()
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", 
line 3429, in first
       ret = list(self[0:1])
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", 
line 3203, in __getitem__
       return list(res)
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", 
line 3535, in __iter__
       return self._execute_and_instances(context)
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/orm/query.py", 
line 3560, in _execute_and_instances
       result = conn.execute(querycontext.statement, self._params)
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
       return meth(self, multiparams, params)
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
       distilled_params,
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
       cursor, statement, parameters, context
     File 
"/usr/local/lib/python3.7/dist-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
       cursor.execute(statement, parameters)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
206, in execute
       res = self._query(query)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/cursors.py", line 
319, in _query
       db.query(q)
     File "/usr/local/lib/python3.7/dist-packages/MySQLdb/connections.py", line 
259, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) 
(1205, 'Lock wait timeout exceeded; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.execution_date AS 
task_instance_execution_date, task_instance.start_date AS 
task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
task_instance.duration AS task_instance_duration, task_instance.state AS 
task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id AS 
task_instance_queued_by_job_id, 
 task_instance.pid AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id
   FROM task_instance
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND 
task_instance.execution_date = %s
    LIMIT %s FOR UPDATE]
   [parameters: ('foobar', 'my_task', datetime.datetime(2021, 7, 12, 22, 30), 
1)]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   </details>
   
   Afterward, the task is marked as Failed. The issue is transient, and tasks 
can be manually rerun to try again.
   
   
   **What you expected to happen**:
   
   If a lock cannot be obtained, it should exit more gracefully and reschedule.
   
   **How to reproduce it**:
   
   You can trigger the non-graceful task failure by manually locking the row 
and then trying to run the task -- it should work on any task.
   
   1. Connect to the MySQL instance backing Airflow
   2. `SET autocommit = OFF;`
   3. `START TRANSACTION;`
   4. Lock the row
   ```
   SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.execution_date AS 
task_instance_execution_date, task_instance.start_date AS 
task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
task_instance.duration AS task_instance_duration, task_instance.state AS 
task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id AS 
task_instance_queued_by_job_id, task_i
 nstance.pid AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id
   FROM task_instance
   WHERE task_instance.dag_id = 'foobar'
        AND task_instance.task_id = 'my_task'
        AND task_instance.execution_date = '2021-07-12 00:00:00.000000'
    LIMIT 1 FOR UPDATE;
   ```
   5. Try to run the task via the UI.
   
   
   **Anything else we need to know**:
   
   Ideally deadlock doesn't ever occur and the task executes normally, however 
the deadlocks are seemingly random and I cannot replicate them. I hypothesized 
that somehow the scheduler was spinning up two worker pods at the same time, 
but if that were the case I would see two dead workers in `Error` state by 
performing `kubectl get pods`. Deadlock itself seems to occur on <1% of tasks, 
but it seems that deadlock itself consistently fails the task without retry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to