tirkarthi opened a new issue, #57081:
URL: https://github.com/apache/airflow/issues/57081

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Our setup has dag processor sharded to handle different folders. dag 
processor when started with a bundle name through `-B` has references only to 
the bundle. When it tries to process another callback from another bundle then 
it crashes while trying to fetch the log file name.
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   1. Create dag_1.py in `/home/karthikeyan/airflow/dags1`
   2. Create dag_2.py in `/home/karthikeyan/airflow/dags2`
   3. Start two dag processor with bundle dags-folder-1 and dags-folder-2 as 
`airflow dag-processor -vvv -B dags-folder-1` and `airflow dag-processor -vvv 
-B dags-folder-2`
   4. Let the dags appear in the UI.
   5. Stop dags-folder-1 and dags-folder-2 dag processor
   6. Trigger dag_1 from the UI and trigger the dag to send callback to db.
   7. Start the dags-folder-2 dag processor which fetches the dags_1 callback 
and crashes while trying to process it. `self._dag_bundles` when -B is passed 
has only references to the passed bundle
   
   ```
   mysql> select * from callback;
   
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
   | created_at                 | priority_weight | type          | 
fetch_method  | data                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                         | state | output | trigger_id | id                     
          |
   
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
   | 2025-10-22 21:42:18.735975 |               1 | dag_processor | 
dag_attribute | {"__var": {"req_data": 
"{\"filepath\":\"dag_1.py\",\"bundle_name\":\"dags-folder-1\",\"bundle_version\":null,\"msg\":null,\"ti\":{\"id\":\"019a0cb0-6cff-76dc-ae43-315ebcbd3de2\",\"task_id\":\"task1\",\"dag_id\":\"dag_1\",\"run_id\":\"manual__2025-10-22T16:11:19+00:00\",\"try_number\":1,\"dag_version_id\":\"019a0ca8-606b-7fdc-bf38-0a0e59af7c03\",\"map_index\":-1,\"hostname\":\"laptop\",\"context_carrier\":null},\"task_callback_type\":\"success\",\"context_from_server\":null,\"type\":\"TaskCallbackRequest\"}",
 "req_class": "TaskCallbackRequest"}, "__type": "dict"} | NULL  | NULL   |      
 NULL | 019a0cb151af773488c25c6afd5b57c2 |
   
+----------------------------+-----------------+---------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+------------+----------------------------------+
   1 row in set (0.00 sec)
   ```
   
   
   ```
   [2025-10-22T16:06:49.847784Z] {dag_processor_job_runner.py:63} ERROR - 
Exception when executing DagProcessorJob
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 272, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 357, in _run_parsing_loop
       self._start_new_processes()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 929, in _start_new_processes
       processor = self._create_process(file)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 908, in _create_process
       logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 886, in _get_logger_for_dag_file
       log_filename = self._render_log_filename(dag_file)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 881, in _render_log_filename
       bundle = next(b for b in self._dag_bundles if b.name == 
dag_file.bundle_name)
                
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   StopIteration
   [2025-10-22T16:06:49.866068Z] {supervisor.py:712} INFO - Process exited 
pid=17796 exit_code=<Negsignal.SIGTERM: -15> signal_sent=SIGTERM
   [2025-10-22T16:06:49.877181Z] {process_utils.py:285} INFO - Waiting up to 5 
seconds for processes to exit...
   [2025-10-22T16:06:49.885516Z] {listener.py:37} DEBUG - Calling 
'before_stopping' with {'component': <airflow.jobs.job.Job object at 
0x7f6408975f50>}
   [2025-10-22T16:06:49.885927Z] {listener.py:38} DEBUG - Hook impls: []
   [2025-10-22T16:06:49.886071Z] {listener.py:42} DEBUG - Result from 
'before_stopping': []
   [2025-10-22T16:06:49.896775Z] {cli_action_loggers.py:98} DEBUG - Calling 
callbacks: []
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.venv/bin/airflow", line 10, 
in <module>
       sys.exit(main())
                ^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/__main__.py", 
line 55, in main
       args.func(args)
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/cli_config.py",
 line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/cli.py", 
line 115, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py",
 line 53, in dag_processor
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/cli/commands/dag_processor_command.py",
 line 56, in <lambda>
       callback=lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute),
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/utils/session.py",
 line 100, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 367, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/job.py", 
line 396, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 272, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 357, in _run_parsing_loop
       self._start_new_processes()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 929, in _start_new_processes
       processor = self._create_process(file)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 908, in _create_process
       logger, logger_filehandle = self._get_logger_for_dag_file(dag_file)
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 886, in _get_logger_for_dag_file
       log_filename = self._render_log_filename(dag_file)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/dag_processing/manager.py",
 line 881, in _render_log_filename
       bundle = next(b for b in self._dag_bundles if b.name == 
dag_file.bundle_name)
                
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   StopIteration
   ```
   
   trigger and operator
   
   ```python
   # cat ~/airflow/plugins/custom_trigger.py 
   
   import asyncio
   
   from airflow.sdk import BaseOperator
   from airflow.triggers.base import BaseTrigger, TriggerEvent, TaskSuccessEvent
   
   
   class CustomTrigger(BaseTrigger):
       def __init__(self):
           super().__init__()
   
       def serialize(self):
           return (self.__class__.__module__ + "." + self.__class__.__name__, 
{})
   
       async def run(self):
           yield TaskSuccessEvent()
   
   
   class CustomOperator(BaseOperator):
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
   
       def execute(self, context, current_item_index=0, event=None):
           self.defer(
               trigger=CustomTrigger(),
               method_name="execute_complete",
           )
   
       def execute_complete(self, *args, **kwargs):
           pass
   ```
   
   ```python
   # cat ~/airflow/dags1/dag_1.py 
   
   from datetime import datetime
   from airflow.sdk import DAG
   from airflow.providers.standard.operators.bash import BashOperator
   from custom_trigger import CustomOperator
   
   
   with DAG(
       dag_id="dag_1",
       schedule="@continuous",
       max_active_runs=1,
       catchup=False
   ):
       task1 = CustomOperator(task_id="task1"
   ```
   
   ```python
   # cat ~/airflow/dags2/dag_2.py 
   
   from datetime import datetime
   from airflow.sdk import DAG
   from airflow.providers.standard.operators.bash import BashOperator
   from custom_trigger import CustomOperator
   
   
   with DAG(
       dag_id="dag_2",
       schedule="@continuous",
       max_active_runs=1,
       catchup=False
   ):
       task1 = CustomOperator(task_id="task1")
   ```
   
   airflow.cfg
   
   ```cfg
   [dag_processor]
   dag_bundle_config_list = [
       {
         "name": "dags-folder-1",
         "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
         "kwargs": {"path": "/home/karthikeyan/airflow/dags1"}
       },
       {
         "name": "dags-folder-2",
         "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
         "kwargs": {"path": "/home/karthikeyan/airflow/dags2"}
       }
      ]
   ```
   
   ### Operating System
   
   Ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to