GitHub user bangbangDong edited a discussion: The scheduler stopped running due 
to a deadlock problem

version: Airflow2.10.4    ubuntu



dag config (scheduler = seconds(30))

```python
with DAG(
        dag_id=task_name,
        schedule=timedelta(seconds=30),
        start_date=datetime(2024, 1, 1),
        dagrun_timeout=timedelta(seconds=300),
        catchup=False
):

        sub_hook = SubprocessHook()

        @task
        def run_command(execute, arguments, capture, work_dir, env):
            command = [execute] + arguments.split()
            result = sub_hook.run_command(command, env=env, cwd=work_dir)
            if result.exit_code is not 0:
                raise ValueError(result.output)
            return result.output

        run_command(dag_config['exec'], dag_config['arg'], 
dag_config.get("capture", False),
                               dag_config.get('work_dir', XXXXX_PATH / 
"config"), dag_config.get('env', None))
```


```sh
[2024-12-28T01:45:01.646+0800] {scheduler_job_runner.py:1016} ERROR - Exception 
when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 999, in _execute
    self._run_scheduler_loop()
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1138, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1244, in _do_scheduling
    self._create_dagruns_for_dags(guard, session)
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py",
 line 93, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 443, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 376, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 398, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/opt/XXXXX/local/python/lib/python3.11/concurrent/futures/_base.py", 
line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/XXXXX/local/python/lib/python3.11/concurrent/futures/_base.py", 
line 401, in __get_result
    raise self._exception
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py",
 line 102, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1318, in _create_dagruns_for_dags
    self._create_dag_runs(non_dataset_dags, session)
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/traces/tracer.py",
 line 58, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1353, in _create_dag_runs
    dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
                              ^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
 line 487, in __get__
    return self.impl.get(state, dict_)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
 line 959, in get
    value = self._fire_loader_callables(state, key, passive)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py",
 line 990, in _fire_loader_callables
    return state._load_expired(state, passive)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/state.py",
 line 712, in _load_expired
    self.manager.expired_attribute_loader(self, toload, passive)
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
 line 1451, in load_scalar_attributes
    result = load_on_ident(
             ^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
 line 407, in load_on_ident
    return load_on_pk_identity(
           ^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
 line 530, in load_on_pk_identity
    session.execute(
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 1716, in execute
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 1555, in _connection_for_bind
    return self._transaction._connection_for_bind(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 724, in _connection_for_bind
    self._assert_active()
  File 
"/home/XXXXX/XXXXX/XXXXX/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
 line 604, in _assert_active
    raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled 
back due to a previous exception during flush. To begin a new transaction with 
this Session, first issue Session.rollback(). Original exception was: 
(pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; 
try restarting transaction')
[SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s, 
next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s, 
next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s, 
next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = 
%(dag_dag_id)s]
[parameters: {'next_dagrun': datetime.datetime(2024, 12, 27, 17, 45), 
'next_dagrun_data_interval_start': datetime.datetime(2024, 12, 27, 17, 45), 
'next_dagrun_data_interval_end': datetime.datetime(2024, 12, 27, 17, 45, 30), 
'next_dagrun_create_after': datetime.datetime(2024, 12, 27, 17, 45, 30), 
'dag_dag_id': 'EVCal'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on 
this error at: https://sqlalche.me/e/14/7s2a)
```

GitHub link: https://github.com/apache/airflow/discussions/45275

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to