This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cef76  Fix Celery Tests (#12166)
95cef76 is described below

commit 95cef76eae50a79716e42519c6e44359feb302c4
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Nov 7 19:24:30 2020 +0000

    Fix Celery Tests (#12166)
    
    Celery tests on Master are failing and then timing out on Postgres and 
MySQL.
    
    Stacktrace (link: 
https://github.com/apache/airflow/runs/1367123586#step:6:4860)
    ```
    >           self.task_publish_retries.pop(key)
    E           KeyError: ('success', 'fake_simple_ti', datetime.datetime(2020, 
11, 7, 8, 0, 13, 62424), 0)
    ```
    
    This commit fixes the error introduced in 
https://github.com/apache/airflow/pull/12140
---
 tests/executors/test_celery_executor.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
index d844cbf..64f0ca2 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -45,7 +45,6 @@ from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
 from airflow.utils.state import State
 from tests.test_utils import db
-from tests.test_utils.config import conf_vars
 
 
 def _prepare_test_bodies():
@@ -143,6 +142,7 @@ class TestCeleryExecutor(unittest.TestCase):
                 # "Enqueue" them. We don't have a real SimpleTaskInstance, so 
directly edit the dict
                 for (key, simple_ti, command, queue, task) in 
task_tuples_to_send:  # pylint: disable=W0612
                     executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+                    executor.task_publish_retries[key] = 1
 
                 executor._process_tasks(task_tuples_to_send)
 
@@ -196,6 +196,7 @@ class TestCeleryExecutor(unittest.TestCase):
             )
             key = ('fail', 'fake_simple_ti', when, 0)
             executor.queued_tasks[key] = value_tuple
+            executor.task_publish_retries[key] = 1
             executor.heartbeat()
         self.assertEqual(0, len(executor.queued_tasks), "Task should no longer 
be queued")
         self.assertEqual(executor.event_buffer[('fail', 'fake_simple_ti', 
when, 0)][0], State.FAILED)
@@ -203,14 +204,15 @@ class TestCeleryExecutor(unittest.TestCase):
     @pytest.mark.integration("redis")
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
-    @conf_vars({("celery", "operation_timeout"): "0.01"})
     def test_retry_on_error_sending_task(self):
         """Test that Airflow retries publishing tasks to Celery Broker atleast 
3 times"""
 
         def fake_execute_command(command):
             print(command)
 
-        with _prepare_app(execute=fake_execute_command), 
self.assertLogs(celery_executor.log) as cm:
+        with _prepare_app(execute=fake_execute_command), self.assertLogs(
+            celery_executor.log
+        ) as cm, mock.patch.object(celery_executor, "OPERATION_TIMEOUT", 
0.001):
             # fake_execute_command takes no arguments while execute_command 
takes 1,
             # which will cause TypeError when calling task.apply_async()
             executor = celery_executor.CeleryExecutor()

Reply via email to