SeiilkanM opened a new issue, #59693:
URL: https://github.com/apache/airflow/issues/59693

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1-7
   
   ### What happened?
   
   We're seeing scheduler crash loops that look similar to the earlier 
DetachedInstanceError issues, but the failing code path seems different.
   Observed error (from scheduler pod logs):
   ```
   2025-12-21T08:56:22.920709Z [info     ] DagRun Finished: 
dag_id=airflow_events_poller, logical_date=2025-12-20 19:50:00+00:00, 
run_id=scheduled__2025-12-20T19:50:00+00:00, run_start_date=2025-12-20 
20:00:00.272899+00:00, run_end_date=2025-12-21 08:56:22.914038+00:00, 
run_duration=46582.641139, state=success, run_type=scheduled, 
data_interval_start=2025-12-20 19:50:00+00:00, data_interval_end=2025-12-20 
20:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
   2025-12-21T08:56:22.931539Z [info     ] DAG airflow_events_poller is at (or 
above) max_active_runs (1 of 1), not creating any more runs 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1756
   2025-12-21T08:56:22.980612Z [info     ] Run 
scheduled__2025-12-19T08:00:00+00:00 of data_pipeline_snowflake has timed-out 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1942
   2025-12-21T08:56:22.981985Z [info     ] DAG data_pipeline_snowflake is at 
(or above) max_active_runs (1 of 1), not creating any more runs 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1756
   2025-12-21T08:56:22.992124Z [error    ] Exception when executing 
SchedulerJob._run_scheduler_loop 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1082
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1078, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1368, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1478, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 97, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
435, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
368, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
390, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 106, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1888, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1955, in _schedule_dag_run
       context_from_server=DagRunContext(
                           ^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 253, 
in __init__
       validated_self = self.__pydantic_validator__.validate_python(data, 
self_instance=self)
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
DagRunContext
   dag_run.consumed_asset_events
     Error extracting attribute: DetachedInstanceError: Parent instance <DagRun 
at 0x7ffb07d401a0> is not bound to a Session; lazy load operation of attribute 
'consumed_asset_events' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun 
data_pipeline_sno...00. run_type: scheduled>, input_type=DagRun]
       For further information visit 
https://errors.pydantic.dev/2.11/v/get_attribute_error
   2025-12-21T08:56:22.993908Z [info     ] Exited execute loop            
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:1094
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 10, in <module>
       sys.exit(main())
                ^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
       args.func(args)
     File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
368, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
397, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1078, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1368, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1478, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 97, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
435, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
368, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line 
390, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/retries.py", 
line 106, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1888, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 1955, in _schedule_dag_run
       context_from_server=DagRunContext(
                           ^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 253, 
in __init__
       validated_self = self.__pydantic_validator__.validate_python(data, 
self_instance=self)
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   pydantic_core._pydantic_core.ValidationError: 1 validation error for 
DagRunContext
   dag_run.consumed_asset_events
     Error extracting attribute: DetachedInstanceError: Parent instance <DagRun 
at 0x7ffb07d401a0> is not bound to a Session; lazy load operation of attribute 
'consumed_asset_events' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<DagRun 
data_pipeline_sno...00. run_type: scheduled>, input_type=DagRun]
       For further information visit 
https://errors.pydantic.dev/2.11/v/get_attribute_error
   ```
   Scheduler exited repeatedly due to:
   DagRunContext validation failure ->  DetachedInstanceError on 
dag_run.consumed_asset_events
   Looks like this crash is in the DagRun timeout callback path, which still 
uses a detached DagRun and needs fix. Can someone please confirm?
   
    [Slack 
thread](https://astronomer.slack.com/archives/CGQSYG25V/p1766326233177979?thread_ts=1759243320.000079&cid=CGQSYG25V)
   
   [Related Airline ticket](https://astronomer.zendesk.com/agent/tickets/86525)
   
   ### What you think should happen instead?
   
   Scheduler should not crash if a DagRun times out. At minimum, it should 
handle missing/lazy-loaded relationship fields safely (or ensure the DagRun is 
session-bound / eagerly loaded before building DagRunContext).
   
   ### How to reproduce
   
   It was Airline alert Tasks exceeded requeue attempts on deployment Prod. 
Seems like customer uses Remote Executor. This crash is in the DagRun timeout 
callback path, which still uses a detached DagRun and needs fix.
   
   ### Operating System
   
   MacOs
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   
https://cmayoi25a305o01o54bcqe81w.astronomer.run/dhrsn2n3/dags/ai_intake_routing_agent_evals/runs/scheduled__2025-12-20T06%3A00%3A00%2B00%3A00/tasks/create_dataset_flow
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to