avkirilishin opened a new issue, #33785:
URL: https://github.com/apache/airflow/issues/33785

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow 2.4.2 (but I suppose other versions too)
   
   I noticed that sometimes some DAGs' and tasks' callbacks don't work. At that 
time, some tasks hung in the "queue" state and needed manual restarts. After 
some investigation, I determined that it happens during the re-launching of 
DagFileProcessorManager.
   
   Here's what I think happens:
   1. DagFileProcessorManager receives requests with callbacks (also failure 
callbacks) from the scheduler and[ puts them into the 
dict](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L717-L751).
   2. Then something goes wrong, and DagFileProcessorManager terminates.
   3. DagFileProcessorAgent [relaunches 
DagFileProcessorManager](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L286-L294).
   4. DagFileProcessorManager starts from scratch with the empty [callback 
dict](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L447).
   5. Due to the fact that tasks are marked as failed [only in the DAG parsing 
process](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/tests/jobs/test_scheduler_job.py#L371-L372)
 (after executing callbacks), some [tasks with on_failure_callback or 
on_retry_callback](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/jobs/scheduler_job_runner.py#L782-L791)
 can stay in the queue state. For example, I saw a sensor task that failed due 
to a network error (tasks could not be sent to the workers) and then stuck at 
the "queue" state.
   
   ### What you think should happen instead
   
   At least we can mark failed tasks with on_failure_callback and 
on_retry_callback at the scheduler, as we do for all other tasks. It prevents 
tasks from sticking.
   
   But it would be great if other callbacks wouldn't be lost after the 
re-launching of DagFileProcessorManager.
   
   ### How to reproduce
   
   It's very hard to reproduce, but you can do something like that:
   
   Add 
[here](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L575)
 this code
   ```python
   self.log.info("Callbacks to execute: %s", self._callback_to_execute)
   ```
   
   Then run:
   ```python
   from datetime import datetime, timedelta
   import pathlib
   import time
   
   from airflow.configuration import conf
   
   from airflow.models import TaskInstance
   from airflow.operators.bash import BashOperator
   from airflow.models import DAG
   from airflow.utils.state import State
   from airflow.callbacks.callback_requests import TaskCallbackRequest
   from airflow.models.taskinstance import SimpleTaskInstance
   
   TEST_DAG_FOLDER = pathlib.Path(__file__).parents[1].resolve() / 'dags'
   test_dag_path = TEST_DAG_FOLDER / 'fake_dags.py'
   async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn')
   processor_agent = DagFileProcessorAgent(test_dag_path, -1, 
timedelta(days=365), [], False, async_mode)
   processor_agent.start()
   
   TI = TaskInstance(
        task=BashOperator(task_id="test", bash_command="true", 
dag=DAG(dag_id='id'),
                                          start_date=datetime.now()),
        run_id="fake_run",
        state=State.RUNNING,
   )
   request = TaskCallbackRequest(
        full_filepath="filepath",
        simple_task_instance=SimpleTaskInstance.from_ti(ti=TI),
        processor_subdir='/test_dir',
        is_failure_callback=True,
   )
   processor_agent.get_callbacks_pipe().send(request)
   
   time.sleep(5)
   processor_agent._process.kill()
   
   processor_agent.heartbeat()
   time.sleep(5)
   processor_agent._process.kill()
   ```
   
   Then check the dag_processor_manager log to see that `_callback_to_execute` 
has had a callback and then became empty.
   
   ### Operating System
   
   CentOS Linux 7
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to