danxian-baiheng opened a new issue, #45606: URL: https://github.com/apache/airflow/issues/45606
### Apache Airflow Provider(s) standard ### Versions of Apache Airflow Providers apache-airflow-providers-daskexecutor==1.1.1 apache-airflow-providers-fab==1.4.1 apache-airflow-providers-ftp==3.11.1 apache-airflow-providers-http==4.13.1 apache-airflow-providers-imap==3.7.0 apache-airflow-providers-smtp==1.8.0 apache-airflow-providers-sqlite==3.9.0 apache-airflow-providers-common-compat==1.2.1 apache-airflow-providers-common-io==1.4.2 apache-airflow-providers-common-sql==1.18.0 ### Apache Airflow version 2.10.2 ### Operating System Debian GNU/Linux 12 (bookworm) ### Deployment Other Docker-based deployment ### Deployment details Kubernetes: v1.28 GoVersion:"go1.15.9" ### What happened ``` ./dask_executor.log-2025-01-12 08:30:47,368 - distributed.worker - INFO - Closing worker gracefully: tcp://{IP:port}. Reason: worker-lifetime-reached ./dask_executor.log-[2025-01-12T15:30:47.438+0000][271075][MainProcess] {{task_command.py:467}} INFO - Running <TaskInstance: {dag_id}.{task_id}.import manual__2025-01-12T15:28:42+00:00 [queued]> on host ./dask_executor.log-2025-01-12 08:30:47,497 - distributed.worker - INFO - Stopping worker at tcp://{IP:port}. Reason: worker-lifetime-reached ./dask_executor.log:2025-01-12 08:30:47,498 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name='execute("TaskInstanceKey(dag_id=\'...\', task_id=\'...\', run_id=\'manual__2025-01-12T15:28:42+00:00\', try_number=1, map_index=-1)")' coro=<Worker.execute() done, defined at /opt/nucolumnaradmin/.venv/lib/python3.11/site-packages/distributed/worker_state_machine.py:3610>> ended with CancelledError ./dask_executor.log-2025-01-12 08:30:47,499 - distributed.core - INFO - Connection to ... has been closed. ./dask_executor.log-2025-01-12 08:30:55,887 - distributed.nanny - INFO - Worker closed ``` Show as above logs, the dask worker triggered gracefully restart when a airflow task still running. After that, the task's status keep in 'pending' and will no longer changed. The running task's context is a shell cmd started with [airflow run ...], it will keep running and mark itself as success in airflow's metadata DB. ``` ... [2025-01-12T15:30:54.978+0000][271103][MainProcess] {{taskinstance.py:352}} INFO - Marking task as SUCCESS. dag_id=Ads_partition_replacement_v2, task_id=replica_SLC_555432.import, run_id=manual__2025-01-12T15:28:42+00:00, execution_date=20250112T152842, start_date=20250112T153048, end_date=20250112T153054 [2025-01-12T15:30:55.048+0000][271075][MainProcess] {{local_task_job_runner.py:266}} INFO - Task exited with return code 0 ``` Their date are actually a same value with different time zone. After that, this airflow task's status in airflow DB was success and will no longer be scheduled, at the mean time, its status in Dask executor(in airflow scheduler process) will keep in 'pending' until airflow scheduler restarted and will affect value of `executor_slots_available`, when the `executor_slots_available` reach zero, the airflow scheduler will stop working. ### What you think should happen instead Obviously the above status is incorrect and looks like a memory leak Bug. ### How to reproduce Maybe airflow should sync task status in Dask executor with metadata DB and cleanup succeed tasks. Or just make sure Dask executor can receive the same status as the actually running cmd. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org