vatsrahul1001 opened a new issue, #55026:
URL: https://github.com/apache/airflow/issues/55026

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Triggerer crashing with below trace
   ```
   
   triggerer                                                                    
   ^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py",
 line 151, in _get_length
   triggerer     return get_task_map_length(v, resolved_vals[k], 
upstream_map_indexes)
   triggerer            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
   triggerer     return dispatch(args[0].__class__)(*args, **kw)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", 
line 591, in _
   triggerer     return (upstream_map_indexes.get(task_id) or 1) * 
len(resolved_val)
   triggerer                                                       
^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py",
 line 90, in __len__
   triggerer     from airflow.sdk.execution_time.task_runner import 
SUPERVISOR_COMMS
   triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
   triggerer [2025-08-28T08:21:30.608+0000] {triggerer_job_runner.py:178} INFO 
- Waiting for triggers to clean up
   triggerer 2025-08-28 08:21:30 [info     [] Process exited                 
[supervisor] exit_code=<Negsignal.SIGINT: -2> pid=21 signal_sent=SIGINT
   triggerer [2025-08-28T08:21:30.709+0000] {triggerer_job_runner.py:183} INFO 
- Exited trigger loop
   triggerer Traceback (most recent call last):
   triggerer   File "/usr/local/bin/airflow", line 10, in <module>
   triggerer     sys.exit(main())
   triggerer              ^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
   triggerer     args.func(args)
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, 
in command
   triggerer     return func(*args, **kwargs)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in 
wrapper
   triggerer     return f(*args, **kwargs)
   triggerer            ^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
   triggerer     return func(*args, **kwargs)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 69, in triggerer
   triggerer     run_command_with_daemon_option(
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
   triggerer     callback()
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 72, in <lambda>
   triggerer     callback=lambda: triggerer_run(args.skip_serve_logs, 
args.capacity, triggerer_heartrate),
   triggerer                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 55, in triggerer_run
   triggerer     run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, 
in wrapper
   triggerer     return func(*args, session=session, **kwargs)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in 
run_job
   triggerer     return execute_job(job, execute_callable=execute_callable)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in 
execute_job
   triggerer     ret = execute_callable()
   triggerer           ^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 172, in _execute
   triggerer     self.trigger_runner.run()
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 528, in run
   triggerer     self.load_triggers()
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in 
wrapper
   triggerer     return func(*args, **kwargs)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 551, in load_triggers
   triggerer     self.update_triggers(set(ids))
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 685, in update_triggers
   triggerer     workload = create_workload(new_trigger_orm)
   triggerer                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 626, in create_workload
   triggerer     trigger = expand_start_trigger_args(trigger)
   triggerer               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", 
line 616, in expand_start_trigger_args
   triggerer     task.render_template_fields(context=context)
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py",
 line 806, in render_template_fields
   triggerer     mapped_kwargs, seen_oids = self._expand_mapped_kwargs(context)
   triggerer                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/mappedoperator.py",
 line 699, in _expand_mapped_kwargs
   triggerer     return self._get_specified_expand_input().resolve(context)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py",
 line 213, in resolve
   triggerer     all_lengths = self._get_map_lengths(sized_resolved, 
upstream_map_indexes)
   triggerer                   
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py",
 line 160, in _get_map_lengths
   triggerer     k: res for k, v in self.value.items() if v is not None if (res 
:= _get_length(k, v)) is not None
   triggerer                                                                    
   ^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/_internal/expandinput.py",
 line 151, in _get_length
   triggerer     return get_task_map_length(v, resolved_vals[k], 
upstream_map_indexes)
   triggerer            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File "/usr/lib64/python3.12/functools.py", line 912, in wrapper
   triggerer     return dispatch(args[0].__class__)(*args, **kw)
   triggerer            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/definitions/xcom_arg.py", 
line 591, in _
   triggerer     return (upstream_map_indexes.get(task_id) or 1) * 
len(resolved_val)
   triggerer                                                       
^^^^^^^^^^^^^^^^^
   triggerer   File 
"/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/lazy_sequence.py",
 line 90, in __len__
   triggerer     from airflow.sdk.execution_time.task_runner import 
SUPERVISOR_COMMS
   triggerer ImportError: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
   triggerer INFO:     Waiting for child process [19]
   triggerer INFO:     Child process [19] died
   triggerer INFO:     Waiting for child process [20]
   triggerer INFO:     Child process [20] died
   triggerer INFO:     Received SIGTERM, exiting.
   triggerer INFO:     Terminated child process [22]
   triggerer INFO:     Terminated child process [39]
   triggerer INFO:     Waiting for child process [22]
   triggerer INFO:     Waiting for child process [39]
   triggerer INFO:     Stopping parent process [17]
   stream closed EOF for 
radiant-voltage-1751/radiant-voltage-1751-triggerer-6977c7c48-n4wdh (triggerer)
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   ```
   Run below DAG in main
   
   from datetime import datetime, timedelta
   from time import sleep
   
   from airflow.sdk import DAG
   from airflow.decorators import task
   from airflow.models.taskinstance import TaskInstance
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.providers.standard.sensors.date_time import DateTimeSensor, 
DateTimeSensorAsync
   from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, 
TimeDeltaSensorAsync
   
   delays = [30, 60, 90]
   
   
   @task
   def get_delays():
       return delays
   
   
   @task
   def get_wakes(delay, **context):
       "Wake {delay} seconds after the task starts"
       ti: TaskInstance = context["ti"]
       return (ti.start_date + timedelta(seconds=delay)).isoformat()
   
   
   with DAG(
       dag_id="datetime_mapped",
       start_date=datetime(1970, 1, 1),
       schedule=None,
       tags=["taskmap"] 
   ) as dag:
   
       wake_times = get_wakes.expand(delay=get_delays())
   
       
DateTimeSensor.partial(task_id="expanded_datetime").expand(target_time=wake_times)
       TimeDeltaSensor.partial(task_id="expanded_timedelta").expand(
           delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
       )
   
       DateTimeSensorAsync.partial(task_id="expanded_datetime_async").expand(
           target_time=wake_times
       )
       TimeDeltaSensorAsync.partial(task_id="expanded_timedelta_async").expand(
           delta=list(map(lambda x: timedelta(seconds=x), [30, 60, 90]))
       )
   
       TimeDeltaSensor(task_id="static_timedelta", delta=timedelta(seconds=90))
       DateTimeSensor(
           task_id="static_datetime",
           target_time="{{macros.datetime.now() + 
macros.timedelta(seconds=90)}}",
       )
   
       PythonOperator(task_id="op_sleep_90", python_callable=lambda: sleep(90))
   ```
   
   ### Operating System
   
   linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to