potiuk opened a new issue #11788: URL: https://github.com/apache/airflow/issues/11788
Scheduler seems to be stucka and finaly exits with deadlock error (in mysql)
in the current master setup in breeze
**What happened**:
When you run example dags in Breeze in current master you get "missing
scheduler" warning and the scheduler seems to be stuck. In MySQL it also
timeouts with Deadloc after a while. In Postrges it seems to be stuck
indefinitely.
**How to reproduce it**:
`./breeze start-airflow --backend mysql --db-reset --load-example-dags`
or
`./breeze start-airflow --backend postgres --db-reset --load-example-dags`
When you try to run example dags they seem to start initially but they are
left in the "SCHEDULED" state and do not continue to run. Scheduler logs stops
at:
```
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2020-10-23 16:53:08,823] {scheduler_job.py:1270} INFO - Starting the
scheduler
[2020-10-23 16:53:08,823] {scheduler_job.py:1275} INFO - Processing each
file at most -1 times
[2020-10-23 16:53:08,824] {scheduler_job.py:1297} INFO - Resetting orphaned
tasks for active dag runs
[2020-10-23 16:53:08,831] {dag_processing.py:250} INFO - Launched
DagFileProcessorManager with pid: 504
[2020-10-23 16:53:08,836] {settings.py:49} INFO - Configured default
timezone Timezone('UTC')
/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py:593:
Warning: (1300, "Invalid utf8mb4 character string: '800495'")
cursor.execute(statement, parameters)
/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py:322: Warning:
(1300, "Invalid utf8mb4 character string: '800495'")
rows += self.execute(sql + postfix)
[2020-10-23 16:53:29,490] {scheduler_job.py:976} INFO - 4 tasks up for
execution:
<TaskInstance: example_bash_operator.also_run_this 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.runme_0 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.runme_1 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.runme_2 2020-10-21
00:00:00+00:00 [scheduled]>
[2020-10-23 16:53:29,491] {scheduler_job.py:1011} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 4 task instances
ready to be queued
[2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG
example_bash_operator has 0/16 running and queued tasks
[2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG
example_bash_operator has 1/16 running and queued tasks
[2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG
example_bash_operator has 2/16 running and queued tasks
[2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG
example_bash_operator has 3/16 running and queued tasks
[2020-10-23 16:53:29,491] {scheduler_job.py:1090} INFO - Setting the
following tasks to queued state:
<TaskInstance: example_bash_operator.runme_0 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.runme_1 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.runme_2 2020-10-21
00:00:00+00:00 [scheduled]>
<TaskInstance: example_bash_operator.also_run_this 2020-10-21
00:00:00+00:00 [scheduled]>
[2020-10-23 16:53:29,493] {scheduler_job.py:1137} INFO - Sending
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_0',
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 3 and queue default
[2020-10-23 16:53:29,493] {base_executor.py:78} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0',
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-10-23 16:53:29,493] {scheduler_job.py:1137} INFO - Sending
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_1',
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 3 and queue default
[2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1',
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-10-23 16:53:29,494] {scheduler_job.py:1137} INFO - Sending
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_2',
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 3 and queue default
[2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2',
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/airflow/example_dags/example_bash_operator.py']
[2020-10-23 16:53:29,494] {scheduler_job.py:1137} INFO - Sending
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')),
try_number=1) to executor with priority 2 and queue default
[2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this',
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/airflow/example_dags/example_bash_operator.py'
]
[2020-10-23 16:53:29,494] {sequential_executor.py:57} INFO - Executing
command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0',
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir',
'/opt/airflow/airflow/example_dags/example_bash_operator.p
y']
[2020-10-23 16:53:31,139] {dagbag.py:436} INFO - Filling up the DagBag from
/opt/airflow/airflow/example_dags/example_bash_operator.py
```
In MySQL you get an extra deadlock exception after a while:
```
Running <TaskInstance: example_bash_operator.runme_1
2020-10-21T00:00:00+00:00 [scheduled]> on host 7216b1a127c3
^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B
^[[B^[[BTraceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line
593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line
50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
378, in _query
db.query(q)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line
280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try
restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 33, in <module>
sys.exit(load_entry_point('apache-airflow', 'console_scripts',
'airflow')())
File "/opt/airflow/airflow/__main__.py", line 40, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_parser.py", line 53, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 84, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 206, in
task_run
_run_task_by_selected_method(args, dag, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 59, in
_run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/opt/airflow/airflow/cli/commands/task_command.py", line 113, in
_run_task_by_local_task_job
run_job.run()
File "/opt/airflow/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/opt/airflow/airflow/jobs/local_task_job.py", line 91, in _execute
pool=self.pool):
File "/opt/airflow/airflow/utils/session.py", line 63, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 945, in
check_and_change_state_before_execution
self.refresh_from_db(session=session, lock_for_update=True)
File "/opt/airflow/airflow/utils/session.py", line 59, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/taskinstance.py", line 534, in
refresh_from_db
ti = qry.with_for_update().first()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py",
line 3402, in first
ret = list(self[0:1])
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py",
line 3176, in __getitem__
return list(res)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py",
line 3508, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py",
line 3533, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py",
line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1130, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",
line 182, in raise_
raise exception
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",
line 1277, in _execute_context
cursor, statement, parameters, context
File
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line
593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line
50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
252, in execute
res = self._query(query)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line
378, in _query
db.query(q)
File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line
280, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1205,
'Lock wait timeout exceeded; try restarting transaction')
[SQL: SELECT task_instance.try_number AS task_instance_try_number,
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS
task_instance_dag_id, task_instance.execution_date AS
task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, tas
k_instance.end_date AS task_instance_end_date, task_instance.duration AS
task_instance_duration, task_instance.state AS task_instance_state,
task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS
task_instance_hostname, task_instance.unixname AS task_instan
ce_unixname, task_instance.job_id AS task_instance_job_id,
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS
task_instance_pool_slots, task_instance.queue AS task_instance_queue,
task_instance.priority_weight AS task_instance_priority_weight,
task_instance.ope
rator AS task_instance_operator, task_instance.queued_dttm AS
task_instance_queued_dttm, task_instance.queued_by_job_id AS
task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid,
task_instance.executor_config AS task_instance_executor_config,
task_instance.externa
l_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND
task_instance.execution_date = %s
LIMIT %s FOR UPDATE]
[parameters: ('example_bash_operator', 'runme_1', datetime.datetime(2020,
10, 21, 0, 0), 1)]
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
