This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4dea367047 Add unit test to cover back compat case in celery (#40035)
4dea367047 is described below
commit 4dea367047eb86831fbaed337f4dbddc3d995e9b
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jun 5 11:03:19 2024 -0700
Add unit test to cover back compat case in celery (#40035)
A unit test which triggers the scenario of a current Celery executor
with the new signature of change_state run against an older version of
Airflow with the old signature of change_state on the BaseExecutor
class.
related #40011
related #39980
related #40012
---
.../integration/executors/test_celery_executor.py | 26 +++++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
diff --git a/tests/integration/executors/test_celery_executor.py
b/tests/integration/executors/test_celery_executor.py
index 64c0a4adc4..5893438d42 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -23,6 +23,7 @@ import logging
import os
import sys
from datetime import datetime
+from importlib import reload
from time import sleep
from unittest import mock
@@ -37,10 +38,12 @@ from kombu.asynchronous import set_event_loop
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.executors import base_executor
from airflow.models.dag import DAG
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.bash import BashOperator
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
from tests.test_utils import db
logger = logging.getLogger(__name__)
@@ -107,6 +110,27 @@ class TestCeleryExecutor:
db.clear_db_runs()
db.clear_db_jobs()
+ def test_change_state_back_compat(self):
+ # This represents the old implementation that an Airflow package may
have
+ def _change_state(self, key: TaskInstanceKey, state:
TaskInstanceState, info=None) -> None:
+ pass
+
+ # Replace change_state function on base executor with the old version
to force the backcompat edge
+ # case we're looking for
+ base_executor.BaseExecutor.change_state = _change_state
+ # Create an instance of celery executor while the base executor is
modified
+ from airflow.providers.celery.executors import celery_executor
+
+ executor = celery_executor.CeleryExecutor()
+
+ # This will throw an exception if the backcompat is not properly
handled
+ executor.change_state(
+ key=TaskInstanceKey("foo", "bar", "baz"),
state=TaskInstanceState.QUEUED, info="test"
+ )
+ # Restore the base executor and celery modules
+ reload(base_executor)
+ reload(celery_executor)
+
@pytest.mark.flaky(reruns=3)
@pytest.mark.parametrize("broker_url", _prepare_test_bodies())
def test_celery_integration(self, broker_url):