uranusjr commented on a change in pull request #19769:
URL: https://github.com/apache/airflow/pull/19769#discussion_r756613516



##########
File path: tests/executors/test_celery_executor.py
##########
@@ -411,6 +404,107 @@ def 
test_check_for_stalled_adopted_tasks_goes_in_ordered_fashion(self):
         assert executor.running == {key_2}
         assert executor.adopted_task_timeouts == {key_2: queued_dttm_2 + 
executor.task_adoption_timeout}
 
+    @pytest.mark.backend("mysql", "postgres")
+    @pytest.mark.parametrize(
+        "state, queued_dttm, executor_id",
+        [
+            (State.SCHEDULED, timezone.utcnow() - timedelta(days=2), '231'),
+            (State.QUEUED, timezone.utcnow(), '231'),
+            (State.QUEUED, timezone.utcnow(), None),
+        ],
+    )
+    def test_stuck_queued_tasks_are_cleared(
+        self, state, queued_dttm, executor_id, session, dag_maker, 
create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks works"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = queued_dttm
+        ti.external_executor_id = executor_id
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == 
ti.task_id).one()
+        assert ti.state == state
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_task_in_queued_tasks_dict_are_not_cleared(
+        self, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks doesn't clear tasks in 
executor.queued_tasks"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+        ti.external_executor_id = '231'
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor.queued_tasks = {ti.key: AsyncResult("231")}
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == 
ti.task_id).one()
+        assert ti.state == State.QUEUED
+
+    @pytest.mark.backend("mysql", "postgres")
+    def test_task_in_running_dict_are_not_cleared(
+        self, session, dag_maker, create_dummy_dag, create_task_instance
+    ):
+        """Test that clear_stuck_queued_tasks doesn't clear tasks in 
executor.running"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+        ti.external_executor_id = '231'
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor.running = {ti.key: AsyncResult("231")}
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == 
ti.task_id).one()
+        assert ti.state == State.QUEUED
+
+    @pytest.mark.backend("mysql", "postgres")
+    @mock.patch("airflow.executors.celery_executor.AsyncResult")
+    def test_task_in_not_pending_in_asyncresults_are_not_cleared(
+        self, asyncresult_mock, session, dag_maker, create_dummy_dag, 
create_task_instance
+    ):
+        """Test that task in other state aside pending in AsyncResult are not 
cleared"""
+        ti = create_task_instance(state=State.QUEUED)
+        ti.queued_dttm = timezone.utcnow() - timedelta(days=2)
+        ti.external_executor_id = '231'
+        session.merge(ti)
+        session.flush()
+        executor = celery_executor.CeleryExecutor()
+        executor._clear_stuck_queued_tasks()
+        session.flush()
+        ti = session.query(TaskInstance).filter(TaskInstance.task_id == 
ti.task_id).one()
+        # Since AsyncResult is mocked, the returned_value is not PENDING
+        # We end up not clearing the task
+        assert ti.state == State.QUEUED
+
+    @pytest.mark.backend("mysql", "postgres")
+    @pytest.mark.parametrize(
+        "last_check_time, state",
+        [
+            (time.time() - 400, State.SCHEDULED),
+            (time.time() - 200, State.QUEUED),

Review comment:
       Consider using `freezegun` here—calculating this against actual time may 
cause issues if the test suite is running slowly, since these `time.time()` are 
evaluated at import time, but the test may not actually start immediately after 
import (if the test runner decides to run other tests first).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to