GitHub user karenbraganz closed a discussion: Exception in _schedule_dag_run() 
due to misconfigured task instance can completely crash scheduling loop

One of Astronomer's customers experienced an issue wherein the scheduling loop 
crashed due to the way a single task was configured. This had an impact on all 
running DAGs since the scheduler was crashing and could not complete its normal 
functions. A misconfigured DAG or task instance should not be able to break the 
scheduling loop and impact other DAGs. Instead, such exceptions should be 
caught allowing the scheduling loop to continue for other DAGs.

I have not been able to reproduce the issue yet but am starting a discussion to 
document the details of this issue and discuss this with the Airflow community.

When the issue occurred, we noticed that the scheduler was crashlooping with 
the below traceback logged:
```
[2025-02-22T18:51:37.004+0000] {scheduler_job_runner.py:863} ERROR - Exception 
when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 846, in _execute
    self._run_scheduler_loop()
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 978, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1060, in _do_scheduling
    callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/retries.py", line 
89, in wrapped_function
    for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
435, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
368, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 
390, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in 
result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/retries.py", line 
98, in wrapped_function
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1404, in _schedule_all_dag_runs
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for 
run in dag_runs]
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1404, in <listcomp>
    callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for 
run in dag_runs]
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1472, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, 
execute_callbacks=False)
                                       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 
76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 
797, in update_state
    info = self.task_instance_scheduling_decisions(session)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 
76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 
930, in task_instance_scheduling_decisions
    tis = self.get_task_instances(session=session, state=State.task_states)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 
76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 
625, in get_task_instances
    return DagRun.fetch_task_instances(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/usr/local/lib/python3.11/site-packages/airflow/api_internal/internal_api_call.py",
 line 115, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 
76, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagrun.py", line 
562, in fetch_task_instances
    return session.scalars(tis).all()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 1476, in all
    return self._allrows()
           ^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 401, in _allrows
    rows = self._fetchall_impl()
           ^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 1389, in _fetchall_impl
    return self._real_result._fetchall_impl()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 1813, in _fetchall_impl
    return list(self.iterator)
           ^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", 
line 147, in chunks
    fetch = cursor._raw_all_rows()
            ^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 393, in _raw_all_rows
    return [make_row(row) for row in rows]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/engine/result.py", 
line 393, in <listcomp>
    return [make_row(row) for row in rows]
            ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/utils/sqlalchemy.py", 
line 254, in process
    value = super_process(value)  # unpickle
            ^^^^^^^^^^^^^^^^^^^^
  File 
"/home/astro/.local/lib/python3.11/site-packages/sqlalchemy/sql/sqltypes.py", 
line 1870, in process
    return loads(value)
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 301, in 
loads
    return load(file, ignore, **kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 287, in 
load
    return Unpickler(file, ignore=ignore, **kwds).load()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 442, in 
load
    obj = StockUnpickler.load(self)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: code() argument 13 must be str, not int
```
The `_schedule_dag_run` 
[method](https://github.com/apache/airflow/blob/e810f00d2d7ee622bd3ff023dd5941a769425429/airflow/jobs/scheduler_job_runner.py#L1553)
 in the traceback suggested that this originated from the scheduling of a 
specific DAG run. 

In order to isolate which DAG run was causing this, we ran the following code 
in the standalone DAG processor, which was not crashlooping:
```
from airflow.settings import Session
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.utils.state import DagRunState
from airflow.utils.state import TaskInstanceState

session = Session()
dagbag = DagBag()
drs = session.query(DagRun).filter(
    DagRun.state == DagRunState.RUNNING,
).all()

for dr in drs:
    dag_id = dr.dag_id
    run_id = dr.run_id
    print(dag_id, run_id)
    dag = dagbag.get_dag(dag_id)
    dr.dag = dag
    info = dr.task_instance_scheduling_decisions(session)
```
This code loops through all running DAGs and calls the 
`task_instance_scheduling_decisions()` 
[method](https://github.com/apache/airflow/blob/e810f00d2d7ee622bd3ff023dd5941a769425429/airflow/models/dagrun.py#L1103)
 (which is downstream to _schedule_dag_run in the traceback)  for each DAG run. 
This also prints the DAG ID and run ID in each iteration of the loop so that we 
can identify which DAG is responsible when the issue arises during the loop.

When we ran this code in the DAG processor, we saw the same traceback and the 
print statement helped us identify which DAG was causing it. As soon as we 
manually marked this running DAG as failed and paused the DAG, the issue was 
resolved (scheduler stopped crashing and was able to schedule other DAG runs) 
confirming that it was indeed that DAG that was breaking the scheduling loop.

We were also able to isolate the task instance causing this with the below code:
```
DagRun.fetch_task_instances(
    dag_id=dr.dag_id, run_id=dr.run_id, task_ids=["task_id_placeholder"], 
state=None, session=session
)

```
The task instance is configured like below (I have swapped out the task ID and 
some of the other names):
```
task_1 = PythonOperator(
        task_id="task_1",
        python_callable=task_1_callable,
        executor_config=new_get_default_args(l_memory=20),
    )
```

We suspect it is either the `python_callable` or `executor_config` that is 
causing this but have not found the root cause. The `executor_config` returns 
the deepcopy of a dictionary that contains several keys besides the 
`pod_override` key. This is not how `executor_config` should be defined. 
However, simply defining `executor_config` in this manner does not trigger the 
issue (according to tests that I ran). 

These are all the details we have so far. Please respond if you have any 
thoughts/ questions or have experienced a similar issue.


GitHub link: https://github.com/apache/airflow/discussions/47042

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to