GitHub user karenbraganz closed a discussion: Exception in _schedule_dag_run()
due to misconfigured task instance can completely crash scheduling loop
One of Astronomer's customers experienced an issue wherein the scheduling loop
crashed due to the way a single task was configured. This had an impact on all
running DAGs since the scheduler was crashing and could not complete its normal
functions. A misconfigured DAG or task instance should not be able to break the
scheduling loop and impact other DAGs. Instead, such exceptions should be
caught allowing the scheduling loop to continue for other DAGs.
I have not been able to reproduce the issue yet but am starting a discussion to
document the details of this issue and discuss this with the Airflow community.
When the issue occurred, we noticed that the scheduler was crashlooping with
the below traceback logged:
```
[2025-02-22T18:51:37.004+0000] {scheduler_job_runner.py:863} ERROR - Exception
when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 846, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 978, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1060, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/retries.py", line
89, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
435, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
368, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line
390, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/airflow/utils/retries.py", line
98, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1404, in _schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for
run in dag_runs]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1404, in <listcomp>
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for
run in dag_runs]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1472, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session,
execute_callbacks=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line
76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line
797, in update_state
info = self.task_instance_scheduling_decisions(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line
76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line
930, in task_instance_scheduling_decisions
tis = self.get_task_instances(session=session, state=State.task_states)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line
76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line
625, in get_task_instances
return DagRun.fetch_task_instances(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py",
line 115, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line
76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line
562, in fetch_task_instances
return session.scalars(tis).all()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 1476, in all
return self._allrows()
^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 401, in _allrows
rows = self._fetchall_impl()
^^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 1389, in _fetchall_impl
return self._real_result._fetchall_impl()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 1813, in _fetchall_impl
return list(self.iterator)
^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py",
line 147, in chunks
fetch = cursor._raw_all_rows()
^^^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 393, in _raw_all_rows
return [make_row(row) for row in rows]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py",
line 393, in <listcomp>
return [make_row(row) for row in rows]
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/sqlalchemy.py",
line 254, in process
value = super_process(value) # unpickle
^^^^^^^^^^^^^^^^^^^^
File
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/sql/sqltypes.py",
line 1870, in process
return loads(value)
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 301, in
loads
return load(file, ignore, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 287, in
load
return Unpickler(file, ignore=ignore, **kwds).load()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 442, in
load
obj = StockUnpickler.load(self)
^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: code() argument 13 must be str, not int
```
The `_schedule_dag_run`
[method](https://github.com/apache/airflow/blob/e810f00d2d7ee622bd3ff023dd5941a769425429/airflow/jobs/scheduler_job_runner.py#L1553)
in the traceback suggested that this originated from the scheduling of a
specific DAG run.
In order to isolate which DAG run was causing this, we ran the following code
in the standalone DAG processor, which was not crashlooping:
```
from airflow.settings import Session
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.utils.state import DagRunState
from airflow.utils.state import TaskInstanceState
session = Session()
dagbag = DagBag()
drs = session.query(DagRun).filter(
DagRun.state == DagRunState.RUNNING,
).all()
for dr in drs:
dag_id = dr.dag_id
run_id = dr.run_id
print(dag_id, run_id)
dag = dagbag.get_dag(dag_id)
dr.dag = dag
info = dr.task_instance_scheduling_decisions(session)
```
This code loops through all running DAGs and calls the
`task_instance_scheduling_decisions()`
[method](https://github.com/apache/airflow/blob/e810f00d2d7ee622bd3ff023dd5941a769425429/airflow/models/dagrun.py#L1103)
(which is downstream to _schedule_dag_run in the traceback) for each DAG run.
This also prints the DAG ID and run ID in each iteration of the loop so that we
can identify which DAG is responsible when the issue arises during the loop.
When we ran this code in the DAG processor, we saw the same traceback and the
print statement helped us identify which DAG was causing it. As soon as we
manually marked this running DAG as failed and paused the DAG, the issue was
resolved (scheduler stopped crashing and was able to schedule other DAG runs)
confirming that it was indeed that DAG that was breaking the scheduling loop.
We were also able to isolate the task instance causing this with the below code:
```
DagRun.fetch_task_instances(
dag_id=dr.dag_id, run_id=dr.run_id, task_ids=["task_id_placeholder"],
state=None, session=session
)
```
The task instance is configured like below (I have swapped out the task ID and
some of the other names):
```
task_1 = PythonOperator(
task_id="task_1",
python_callable=task_1_callable,
executor_config=new_get_default_args(l_memory=20),
)
```
We suspect it is either the `python_callable` or `executor_config` that is
causing this but have not found the root cause. The `executor_config` returns
the deepcopy of a dictionary that contains several keys besides the
`pod_override` key. This is not how `executor_config` should be defined.
However, simply defining `executor_config` in this manner does not trigger the
issue (according to tests that I ran).
These are all the details we have so far. Please respond if you have any
thoughts/ questions or have experienced a similar issue.
GitHub link: https://github.com/apache/airflow/discussions/47042
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]