GitHub user bangbangDong edited a discussion: The scheduler stopped running due
to a deadlock problem
version: Airflow2.10.4 ubuntu
dag config (scheduler = seconds(30))
```python
with DAG(
dag_id=task_name,
schedule=timedelta(seconds=30),
start_date=datetime(2024, 1, 1),
dagrun_timeout=timedelta(seconds=300),
catchup=False
):
sub_hook = SubprocessHook()
@task
def run_command(execute, arguments, capture, work_dir, env):
command = [execute] + arguments.split()
result = sub_hook.run_command(command, env=env, cwd=work_dir)
if result.exit_code is not 0:
raise ValueError(result.output)
return result.output
run_command(dag_config['exec'], dag_config['arg'],
dag_config.get("capture", False),
dag_config.get('work_dir', XXXXX_PATH /
"config"), dag_config.get('env', None))
```
```sh
[2024-12-28T01:45:01.646+0800] {scheduler_job_runner.py:1016} ERROR - Exception
when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 999, in _execute
self._run_scheduler_loop()
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1138, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1244, in _do_scheduling
self._create_dagruns_for_dags(guard, session)
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py",
line 93, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 443, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
line 398, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/opt/XXXXX/local/python/lib/python3.11/concurrent/futures/_base.py",
line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/opt/XXXXX/local/python/lib/python3.11/concurrent/futures/_base.py",
line 401, in __get_result
raise self._exception
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py",
line 102, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1318, in _create_dagruns_for_dags
self._create_dag_runs(non_dataset_dags, session)
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1353, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
line 487, in __get__
return self.impl.get(state, dict_)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
line 959, in get
value = self._fire_loader_callables(state, key, passive)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
line 990, in _fire_loader_callables
return state._load_expired(state, passive)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/state.py",
line 712, in _load_expired
self.manager.expired_attribute_loader(self, toload, passive)
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
line 1451, in load_scalar_attributes
result = load_on_ident(
^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
line 407, in load_on_ident
return load_on_pk_identity(
^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
line 530, in load_on_pk_identity
session.execute(
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1716, in execute
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1555, in _connection_for_bind
return self._transaction._connection_for_bind(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 724, in _connection_for_bind
self._assert_active()
File
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled
back due to a previous exception during flush. To begin a new transaction with
this Session, first issue Session.rollback(). Original exception was:
(pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock;
try restarting transaction')
[SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s,
next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s,
next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s,
next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id =
%(dag_dag_id)s]
[parameters: {'next_dagrun': datetime.datetime(2024, 12, 27, 17, 45),
'next_dagrun_data_interval_start': datetime.datetime(2024, 12, 27, 17, 45),
'next_dagrun_data_interval_end': datetime.datetime(2024, 12, 27, 17, 45, 30),
'next_dagrun_create_after': datetime.datetime(2024, 12, 27, 17, 45, 30),
'dag_dag_id': 'EVCal'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on
this error at: https://sqlalche.me/e/14/7s2a)
```
GitHub link: https://github.com/apache/airflow/discussions/45275
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]