This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 317dd298e02f54156f362f8d39307e575106b5b5 Author: Ephraim Anierobi <[email protected]> AuthorDate: Mon Feb 7 10:43:51 2022 +0100 Filter celery stuck task query to exclude completed tasks (#21335) On testing #19769, it was reported that there was a spike in CPU usage https://github.com/apache/airflow/pull/19769#issuecomment-1029755436 Hopefully, this will fix it (cherry picked from commit a49224fa7ce45e9765c0d752edc30430e0d3ce14) --- airflow/executors/celery_executor.py | 7 ++++++- tests/executors/test_celery_executor.py | 8 ++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 8daced6..9185f1b 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -404,7 +404,12 @@ class CeleryExecutor(BaseExecutor): session_ = app.backend.ResultSession() task_cls = getattr(app.backend, "task_cls", TaskDb) with session_cleanup(session_): - celery_task_ids = [t.task_id for t in session_.query(task_cls.task_id).all()] + celery_task_ids = [ + t.task_id + for t in session_.query(task_cls.task_id) + .filter(~task_cls.status.in_([celery_states.SUCCESS, celery_states.FAILURE])) + .all() + ] self.log.debug("Checking for stuck queued tasks") max_allowed_time = utcnow() - self.task_adoption_timeout diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 5632f7d..f057440 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -42,7 +42,7 @@ from parameterized import parameterized from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowTaskTimeout from airflow.executors import celery_executor -from airflow.executors.celery_executor import BulkStateFetcher +from airflow.executors.celery_executor import BulkStateFetcher, CeleryExecutor from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey @@ -532,6 +532,7 @@ class TestCeleryExecutor: assert ti.state == state @mock.patch("celery.backends.database.DatabaseBackend.ResultSession") + @mock.patch.object(CeleryExecutor, "update_all_task_states") @pytest.mark.backend("mysql", "postgres") @freeze_time("2020-01-01") @pytest.mark.parametrize( @@ -543,6 +544,7 @@ class TestCeleryExecutor: ) def test_the_check_interval_to_clear_stuck_queued_task_is_correct_for_db_query( self, + mock_update_all_task_states, mock_result_session, task_id, state, @@ -557,7 +559,9 @@ class TestCeleryExecutor: mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://") with mock.patch('airflow.executors.celery_executor.Celery.backend', mock_backend): mock_session = mock_backend.ResultSession.return_value - mock_session.query.return_value.all.return_value = [result_obj("SUCCESS", task_id)] + mock_session.query.return_value.filter.return_value.all.return_value = [ + result_obj("SUCCESS", task_id) + ] last_check_time = time.time() - 302 # should clear ti state
