This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 729bb64827 Attempt to stabilize flaky celery integration test (#39892)
729bb64827 is described below
commit 729bb64827c9effda1a45d3339ad88dfd86e09af
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 28 12:00:26 2024 +0200
Attempt to stabilize flaky celery integration test (#39892)
---
tests/integration/executors/test_celery_executor.py | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git a/tests/integration/executors/test_celery_executor.py
b/tests/integration/executors/test_celery_executor.py
index 72e9ca9e10..2b72d2e91d 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -23,6 +23,7 @@ import logging
import os
import sys
from datetime import datetime
+from time import sleep
from unittest import mock
# leave this it is used by the test worker
@@ -42,6 +43,8 @@ from airflow.operators.bash import BashOperator
from airflow.utils.state import State
from tests.test_utils import db
+logger = logging.getLogger(__name__)
+
def _prepare_test_bodies():
if "CELERY_BROKER_URLS" in os.environ:
@@ -145,7 +148,15 @@ class TestCeleryExecutor:
executor.task_publish_retries[key] = 1
executor._process_tasks(task_tuples_to_send)
-
+ for _ in range(20):
+ num_tasks = len(executor.tasks.keys())
+ if num_tasks == 2:
+ break
+ logger.info(
+ "Waiting 0.1 s for tasks to be processed
asynchronously. Processed so far %d",
+ num_tasks,
+ )
+ sleep(0.4)
assert list(executor.tasks.keys()) == [
("success", "fake_simple_ti", execute_date, 0),
("fail", "fake_simple_ti", execute_date, 0),