This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4dea367047 Add unit test to cover back compat case in celery (#40035)
4dea367047 is described below

commit 4dea367047eb86831fbaed337f4dbddc3d995e9b
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Jun 5 11:03:19 2024 -0700

    Add unit test to cover back compat case in celery (#40035)
    
    A unit test which triggers the scenario of a current Celery executor
    with the new signature of change_state run against an older version of
    Airflow with the old signature of change_state on the BaseExecutor
    class.
    
    related #40011
    related #39980
    related #40012
---
 .../integration/executors/test_celery_executor.py  | 26 +++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/tests/integration/executors/test_celery_executor.py 
b/tests/integration/executors/test_celery_executor.py
index 64c0a4adc4..5893438d42 100644
--- a/tests/integration/executors/test_celery_executor.py
+++ b/tests/integration/executors/test_celery_executor.py
@@ -23,6 +23,7 @@ import logging
 import os
 import sys
 from datetime import datetime
+from importlib import reload
 from time import sleep
 from unittest import mock
 
@@ -37,10 +38,12 @@ from kombu.asynchronous import set_event_loop
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.executors import base_executor
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.operators.bash import BashOperator
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
 from tests.test_utils import db
 
 logger = logging.getLogger(__name__)
@@ -107,6 +110,27 @@ class TestCeleryExecutor:
         db.clear_db_runs()
         db.clear_db_jobs()
 
+    def test_change_state_back_compat(self):
+        # This represents the old implementation that an Airflow package may 
have
+        def _change_state(self, key: TaskInstanceKey, state: 
TaskInstanceState, info=None) -> None:
+            pass
+
+        # Replace change_state function on base executor with the old version 
to force the backcompat edge
+        # case we're looking for
+        base_executor.BaseExecutor.change_state = _change_state
+        # Create an instance of celery executor while the base executor is 
modified
+        from airflow.providers.celery.executors import celery_executor
+
+        executor = celery_executor.CeleryExecutor()
+
+        # This will throw an exception if the backcompat is not properly 
handled
+        executor.change_state(
+            key=TaskInstanceKey("foo", "bar", "baz"), 
state=TaskInstanceState.QUEUED, info="test"
+        )
+        # Restore the base executor and celery modules
+        reload(base_executor)
+        reload(celery_executor)
+
     @pytest.mark.flaky(reruns=3)
     @pytest.mark.parametrize("broker_url", _prepare_test_bodies())
     def test_celery_integration(self, broker_url):

Reply via email to