avkirilishin opened a new issue, #33785: URL: https://github.com/apache/airflow/issues/33785
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened Airflow 2.4.2 (but I suppose other versions too) I noticed that sometimes some DAGs' and tasks' callbacks don't work. At that time, some tasks hung in the "queue" state and needed manual restarts. After some investigation, I determined that it happens during the re-launching of DagFileProcessorManager. Here's what I think happens: 1. DagFileProcessorManager receives requests with callbacks (also failure callbacks) from the scheduler and[ puts them into the dict](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L717-L751). 2. Then something goes wrong, and DagFileProcessorManager terminates. 3. DagFileProcessorAgent [relaunches DagFileProcessorManager](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L286-L294). 4. DagFileProcessorManager starts from scratch with the empty [callback dict](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L447). 5. Due to the fact that tasks are marked as failed [only in the DAG parsing process](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/tests/jobs/test_scheduler_job.py#L371-L372) (after executing callbacks), some [tasks with on_failure_callback or on_retry_callback](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/jobs/scheduler_job_runner.py#L782-L791) can stay in the queue state. For example, I saw a sensor task that failed due to a network error (tasks could not be sent to the workers) and then stuck at the "queue" state. ### What you think should happen instead At least we can mark failed tasks with on_failure_callback and on_retry_callback at the scheduler, as we do for all other tasks. It prevents tasks from sticking. But it would be great if other callbacks wouldn't be lost after the re-launching of DagFileProcessorManager. ### How to reproduce It's very hard to reproduce, but you can do something like that: Add [here](https://github.com/apache/airflow/blob/3ba994d8f4c4b5ce3828bebcff28bbfc25170004/airflow/dag_processing/manager.py#L575) this code ```python self.log.info("Callbacks to execute: %s", self._callback_to_execute) ``` Then run: ```python from datetime import datetime, timedelta import pathlib import time from airflow.configuration import conf from airflow.models import TaskInstance from airflow.operators.bash import BashOperator from airflow.models import DAG from airflow.utils.state import State from airflow.callbacks.callback_requests import TaskCallbackRequest from airflow.models.taskinstance import SimpleTaskInstance TEST_DAG_FOLDER = pathlib.Path(__file__).parents[1].resolve() / 'dags' test_dag_path = TEST_DAG_FOLDER / 'fake_dags.py' async_mode = 'sqlite' not in conf.get('database', 'sql_alchemy_conn') processor_agent = DagFileProcessorAgent(test_dag_path, -1, timedelta(days=365), [], False, async_mode) processor_agent.start() TI = TaskInstance( task=BashOperator(task_id="test", bash_command="true", dag=DAG(dag_id='id'), start_date=datetime.now()), run_id="fake_run", state=State.RUNNING, ) request = TaskCallbackRequest( full_filepath="filepath", simple_task_instance=SimpleTaskInstance.from_ti(ti=TI), processor_subdir='/test_dir', is_failure_callback=True, ) processor_agent.get_callbacks_pipe().send(request) time.sleep(5) processor_agent._process.kill() processor_agent.heartbeat() time.sleep(5) processor_agent._process.kill() ``` Then check the dag_processor_manager log to see that `_callback_to_execute` has had a callback and then became empty. ### Operating System CentOS Linux 7 ### Versions of Apache Airflow Providers _No response_ ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
