This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 95cef76 Fix Celery Tests (#12166)
95cef76 is described below
commit 95cef76eae50a79716e42519c6e44359feb302c4
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Nov 7 19:24:30 2020 +0000
Fix Celery Tests (#12166)
Celery tests on Master are failing and then timing out on Postgres and
MySQL.
Stacktrace (link:
https://github.com/apache/airflow/runs/1367123586#step:6:4860)
```
> self.task_publish_retries.pop(key)
E KeyError: ('success', 'fake_simple_ti', datetime.datetime(2020,
11, 7, 8, 0, 13, 62424), 0)
```
This commit fixes the error introduced in
https://github.com/apache/airflow/pull/12140
---
tests/executors/test_celery_executor.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/tests/executors/test_celery_executor.py
b/tests/executors/test_celery_executor.py
index d844cbf..64f0ca2 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -45,7 +45,6 @@ from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from airflow.utils.state import State
from tests.test_utils import db
-from tests.test_utils.config import conf_vars
def _prepare_test_bodies():
@@ -143,6 +142,7 @@ class TestCeleryExecutor(unittest.TestCase):
# "Enqueue" them. We don't have a real SimpleTaskInstance, so
directly edit the dict
for (key, simple_ti, command, queue, task) in
task_tuples_to_send: # pylint: disable=W0612
executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+ executor.task_publish_retries[key] = 1
executor._process_tasks(task_tuples_to_send)
@@ -196,6 +196,7 @@ class TestCeleryExecutor(unittest.TestCase):
)
key = ('fail', 'fake_simple_ti', when, 0)
executor.queued_tasks[key] = value_tuple
+ executor.task_publish_retries[key] = 1
executor.heartbeat()
self.assertEqual(0, len(executor.queued_tasks), "Task should no longer
be queued")
self.assertEqual(executor.event_buffer[('fail', 'fake_simple_ti',
when, 0)][0], State.FAILED)
@@ -203,14 +204,15 @@ class TestCeleryExecutor(unittest.TestCase):
@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.backend("mysql", "postgres")
- @conf_vars({("celery", "operation_timeout"): "0.01"})
def test_retry_on_error_sending_task(self):
"""Test that Airflow retries publishing tasks to Celery Broker atleast
3 times"""
def fake_execute_command(command):
print(command)
- with _prepare_app(execute=fake_execute_command),
self.assertLogs(celery_executor.log) as cm:
+ with _prepare_app(execute=fake_execute_command), self.assertLogs(
+ celery_executor.log
+ ) as cm, mock.patch.object(celery_executor, "OPERATION_TIMEOUT",
0.001):
# fake_execute_command takes no arguments while execute_command
takes 1,
# which will cause TypeError when calling task.apply_async()
executor = celery_executor.CeleryExecutor()