tirkarthi opened a new issue, #47735:
URL: https://github.com/apache/airflow/issues/47735

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Running attached reproducer dag which has a literal list to expand crashes 
the scheduler with below stack trace. It looks like 
`SchedulerListOfDictsExpandInput` in similar file also needs to implement 
`resolve` method. 
   
   ```
   [2025-03-13T20:56:12.085+0530] {cli_action_loggers.py:97} DEBUG - Calling 
callbacks: []
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
   [2025-03-13 20:56:12 +0530] [12759] [INFO] Worker exiting (pid: 12759)
                ^^^^^^
   [2025-03-13 20:56:12 +0530] [12760] [INFO] Worker exiting (pid: 12760)
     File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 
58, in main
       args.func(args)
     File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 
111, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 52, in scheduler
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
                        ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py",
 line 43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
342, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
371, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 937, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1063, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1163, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", 
line 93, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 443, in __iter__
       do = self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 376, in iter
       result = action(retry_state)
                ^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py",
 line 398, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
                                        ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", 
line 102, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1569, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                         
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1569, in <listcomp>
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", 
line 1683, in _schedule_dag_run
       dag_run.schedule_tis(schedulable_tis, session, 
max_tis_per_query=self.job.max_tis_per_query)
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 98, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", 
line 1711, in schedule_tis
       start_from_trigger = ti.task.expand_start_from_trigger(context=context, 
session=session)
                            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/models/mappedoperator.py", line 
64, in expand_start_from_trigger
       mapped_kwargs, _ = self._expand_mapped_kwargs(context)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/airflow/sdk/definitions/mappedoperator.py",
 line 691, in _expand_mapped_kwargs
       return self._get_specified_expand_input().resolve(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   AttributeError: 'SchedulerDictOfListsExpandInput' object has no attribute 
'resolve'
   [2025-03-13 20:56:12 +0530] [12758] [INFO] Shutting down: Master
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Run the below dag with scheduler in another tab.
   
   ```python
   from __future__ import annotations
   
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.sensors.date_time import DateTimeSensorAsync
   from airflow.utils import timezone
   
   with DAG(
       dag_id="file_trigger_expand_crash_1",
       start_date=datetime(2021, 1, 1),
       catchup=False,
       schedule=None,
   ) as dag:
       instant = timezone.datetime(2026, 11, 22)
       task = DateTimeSensorAsync.partial(task_id="async", 
poke_interval=3).expand(
           target_time=[str(instant + timedelta(seconds=3)), str(instant + 
timedelta(seconds=10))]
       )
   
       task
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to