This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 78569832045 [v2-10-test] Fix `FileTaskHandler` only read from default 
executor (#46000)
78569832045 is described below

commit 78569832045b9c6119d601faa8d6b945f2c16009
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Mon Jan 27 01:36:44 2025 +0800

    [v2-10-test] Fix `FileTaskHandler` only read from default executor (#46000)
    
    * [v2-10-test] Fix FileTaskHandler only read from default executor
    
    * Fix test_dag_executors side effect
---
 airflow/executors/executor_loader.py               |  4 +
 airflow/utils/log/file_task_handler.py             | 31 +++++--
 tests/conftest.py                                  | 12 +++
 tests/executors/test_executor_loader.py            | 88 ++++++++++---------
 tests/models/test_dag.py                           |  4 +-
 tests/test_utils/executor_loader.py                | 33 ++++++++
 tests/ti_deps/deps/test_ready_to_reschedule_dep.py |  1 +
 tests/utils/test_log_handlers.py                   | 99 +++++++++++++++++++++-
 8 files changed, 216 insertions(+), 56 deletions(-)

diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 3c088879061..7ad42a2fb1b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -201,6 +201,10 @@ class ExecutorLoader:
     @classmethod
     def lookup_executor_name_by_str(cls, executor_name_str: str) -> 
ExecutorName:
         # lookup the executor by alias first, if not check if we're given a 
module path
+        if not _classname_to_executors or not _module_to_executors or not 
_alias_to_executors:
+            # if we haven't loaded the executors yet, such as directly calling 
load_executor
+            cls._get_executor_names()
+
         if executor_name := _alias_to_executors.get(executor_name_str):
             return executor_name
         elif executor_name := _module_to_executors.get(executor_name_str):
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 9eb55c707f1..2df73ef4ffa 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -46,6 +46,7 @@ from airflow.utils.state import State, TaskInstanceState
 if TYPE_CHECKING:
     from pendulum import DateTime
 
+    from airflow.executors.base_executor import BaseExecutor
     from airflow.models import DagRun
     from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancekey import TaskInstanceKey
@@ -185,6 +186,8 @@ class FileTaskHandler(logging.Handler):
     inherits_from_empty_operator_log_message = (
         "Operator inherits from empty operator and thus does not have logs"
     )
+    executor_instances: dict[str, BaseExecutor] = {}
+    DEFAULT_EXECUTOR_KEY = "_default_executor"
 
     def __init__(
         self,
@@ -340,11 +343,26 @@ class FileTaskHandler(logging.Handler):
     def _read_grouped_logs(self):
         return False
 
-    @cached_property
-    def _executor_get_task_log(self) -> Callable[[TaskInstance, int], 
tuple[list[str], list[str]]]:
-        """This cached property avoids loading executor repeatedly."""
-        executor = ExecutorLoader.get_default_executor()
-        return executor.get_task_log
+    def _get_executor_get_task_log(
+        self, ti: TaskInstance
+    ) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
+        """
+        Get the get_task_log method from executor of current task instance.
+
+        Since there might be multiple executors, so we need to get the 
executor of current task instance instead of getting from default executor.
+        :param ti: task instance object
+        :return: get_task_log method of the executor
+        """
+        executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY
+        executor = self.executor_instances.get(executor_name)
+        if executor is not None:
+            return executor.get_task_log
+
+        if executor_name == self.DEFAULT_EXECUTOR_KEY:
+            self.executor_instances[executor_name] = 
ExecutorLoader.get_default_executor()
+        else:
+            self.executor_instances[executor_name] = 
ExecutorLoader.load_executor(executor_name)
+        return self.executor_instances[executor_name].get_task_log
 
     def _read(
         self,
@@ -386,7 +404,8 @@ class FileTaskHandler(logging.Handler):
             messages_list.extend(remote_messages)
         has_k8s_exec_pod = False
         if ti.state == TaskInstanceState.RUNNING:
-            response = self._executor_get_task_log(ti, try_number)
+            executor_get_task_log = self._get_executor_get_task_log(ti)
+            response = executor_get_task_log(ti, try_number)
             if response:
                 executor_messages, executor_logs = response
             if executor_messages:
diff --git a/tests/conftest.py b/tests/conftest.py
index 6d064fa0a9b..6c17c6f4036 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1487,3 +1487,15 @@ def clean_dags_and_dagruns():
     yield  # Test runs here
     clear_db_dags()
     clear_db_runs()
+
+
[email protected]
+def clean_executor_loader():
+    """Clean the executor_loader state, as it stores global variables in the 
module, causing side effects for some tests."""
+    from airflow.executors.executor_loader import ExecutorLoader
+    from tests.test_utils.executor_loader import clean_executor_loader_module
+
+    clean_executor_loader_module()
+    yield  # Test runs here
+    clean_executor_loader_module()
+    ExecutorLoader.init_executors()
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index 2192487a01c..dc60b9cc507 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 from contextlib import nullcontext
-from importlib import reload
 from unittest import mock
 
 import pytest
@@ -25,7 +24,7 @@ import pytest
 from airflow import plugins_manager
 from airflow.exceptions import AirflowConfigException
 from airflow.executors import executor_loader
-from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, 
ExecutorName
+from airflow.executors.executor_loader import ConnectorSource, ExecutorName
 from airflow.executors.local_executor import LocalExecutor
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
@@ -50,24 +49,12 @@ class FakePlugin(plugins_manager.AirflowPlugin):
     executors = [FakeExecutor]
 
 
[email protected]("clean_executor_loader")
 class TestExecutorLoader:
-    def setup_method(self) -> None:
-        from airflow.executors import executor_loader
-
-        reload(executor_loader)
-        global ExecutorLoader
-        ExecutorLoader = executor_loader.ExecutorLoader  # type: ignore
-
-    def teardown_method(self) -> None:
-        from airflow.executors import executor_loader
-
-        reload(executor_loader)
-        ExecutorLoader.init_executors()
-
     def test_no_executor_configured(self):
         with conf_vars({("core", "executor"): None}):
             with pytest.raises(AirflowConfigException, match=r".*not found in 
config$"):
-                ExecutorLoader.get_default_executor()
+                executor_loader.ExecutorLoader.get_default_executor()
 
     @pytest.mark.parametrize(
         "executor_name",
@@ -81,18 +68,20 @@ class TestExecutorLoader:
     )
     def test_should_support_executor_from_core(self, executor_name):
         with conf_vars({("core", "executor"): executor_name}):
-            executor = ExecutorLoader.get_default_executor()
+            executor = executor_loader.ExecutorLoader.get_default_executor()
             assert executor is not None
             assert executor_name == executor.__class__.__name__
             assert executor.name is not None
-            assert executor.name == 
ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
+            assert executor.name == ExecutorName(
+                executor_loader.ExecutorLoader.executors[executor_name], 
alias=executor_name
+            )
             assert executor.name.connector_source == ConnectorSource.CORE
 
     @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
     @mock.patch("airflow.plugins_manager.executors_modules", None)
     def test_should_support_plugins(self):
         with conf_vars({("core", "executor"): 
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
-            executor = ExecutorLoader.get_default_executor()
+            executor = executor_loader.ExecutorLoader.get_default_executor()
             assert executor is not None
             assert "FakeExecutor" == executor.__class__.__name__
             assert executor.name is not None
@@ -101,7 +90,7 @@ class TestExecutorLoader:
 
     def test_should_support_custom_path(self):
         with conf_vars({("core", "executor"): 
"tests.executors.test_executor_loader.FakeExecutor"}):
-            executor = ExecutorLoader.get_default_executor()
+            executor = executor_loader.ExecutorLoader.get_default_executor()
             assert executor is not None
             assert "FakeExecutor" == executor.__class__.__name__
             assert executor.name is not None
@@ -172,17 +161,17 @@ class TestExecutorLoader:
     )
     def test_get_hybrid_executors_from_config(self, executor_config, 
expected_executors_list):
         with conf_vars({("core", "executor"): executor_config}):
-            executors = ExecutorLoader._get_executor_names()
+            executors = executor_loader.ExecutorLoader._get_executor_names()
             assert executors == expected_executors_list
 
     def test_init_executors(self):
         with conf_vars({("core", "executor"): "CeleryExecutor"}):
-            executors = ExecutorLoader.init_executors()
-            executor_name = ExecutorLoader.get_default_executor_name()
+            executors = executor_loader.ExecutorLoader.init_executors()
+            executor_name = 
executor_loader.ExecutorLoader.get_default_executor_name()
             assert len(executors) == 1
             assert isinstance(executors[0], CeleryExecutor)
-            assert "CeleryExecutor" in ExecutorLoader.executors
-            assert ExecutorLoader.executors["CeleryExecutor"] == 
executor_name.module_path
+            assert "CeleryExecutor" in executor_loader.ExecutorLoader.executors
+            assert executor_loader.ExecutorLoader.executors["CeleryExecutor"] 
== executor_name.module_path
             assert 
isinstance(executor_loader._loaded_executors[executor_name], CeleryExecutor)
 
     @pytest.mark.parametrize(
@@ -202,7 +191,7 @@ class TestExecutorLoader:
             with pytest.raises(
                 AirflowConfigException, match=r".+Duplicate executors are not 
yet supported.+"
             ):
-                ExecutorLoader._get_executor_names()
+                executor_loader.ExecutorLoader._get_executor_names()
 
     @pytest.mark.parametrize(
         "executor_config",
@@ -218,7 +207,7 @@ class TestExecutorLoader:
     def 
test_get_hybrid_executors_from_config_core_executors_bad_config_format(self, 
executor_config):
         with conf_vars({("core", "executor"): executor_config}):
             with pytest.raises(AirflowConfigException):
-                ExecutorLoader._get_executor_names()
+                executor_loader.ExecutorLoader._get_executor_names()
 
     @pytest.mark.parametrize(
         ("executor_config", "expected_value"),
@@ -234,7 +223,7 @@ class TestExecutorLoader:
     )
     def test_should_support_import_executor_from_core(self, executor_config, 
expected_value):
         with conf_vars({("core", "executor"): executor_config}):
-            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            executor, import_source = 
executor_loader.ExecutorLoader.import_default_executor_cls()
             assert expected_value == executor.__name__
             assert import_source == ConnectorSource.CORE
 
@@ -249,7 +238,7 @@ class TestExecutorLoader:
     )
     def test_should_support_import_plugins(self, executor_config):
         with conf_vars({("core", "executor"): executor_config}):
-            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            executor, import_source = 
executor_loader.ExecutorLoader.import_default_executor_cls()
             assert "FakeExecutor" == executor.__name__
             assert import_source == ConnectorSource.PLUGIN
 
@@ -263,7 +252,7 @@ class TestExecutorLoader:
     )
     def test_should_support_import_custom_path(self, executor_config):
         with conf_vars({("core", "executor"): executor_config}):
-            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
+            executor, import_source = 
executor_loader.ExecutorLoader.import_default_executor_cls()
             assert "FakeExecutor" == executor.__name__
             assert import_source == ConnectorSource.CUSTOM_PATH
 
@@ -272,7 +261,7 @@ class TestExecutorLoader:
     @pytest.mark.parametrize("executor", [FakeExecutor, 
FakeSingleThreadedExecutor])
     def test_validate_database_executor_compatibility_general(self, 
monkeypatch, executor):
         
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
-        ExecutorLoader.validate_database_executor_compatibility(executor)
+        
executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
 
     @pytest.mark.db_test
     @pytest.mark.backend("sqlite")
@@ -290,24 +279,32 @@ class TestExecutorLoader:
     def test_validate_database_executor_compatibility_sqlite(self, 
monkeypatch, executor, expectation):
         
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
         with expectation:
-            ExecutorLoader.validate_database_executor_compatibility(executor)
+            
executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
 
     def test_load_executor(self):
         with conf_vars({("core", "executor"): "LocalExecutor"}):
-            ExecutorLoader.init_executors()
-            assert isinstance(ExecutorLoader.load_executor("LocalExecutor"), 
LocalExecutor)
-            assert 
isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), 
LocalExecutor)
-            assert isinstance(ExecutorLoader.load_executor(None), 
LocalExecutor)
+            executor_loader.ExecutorLoader.init_executors()
+            assert 
isinstance(executor_loader.ExecutorLoader.load_executor("LocalExecutor"), 
LocalExecutor)
+            assert isinstance(
+                
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+                LocalExecutor,
+            )
+            assert 
isinstance(executor_loader.ExecutorLoader.load_executor(None), LocalExecutor)
 
     def test_load_executor_alias(self):
         with conf_vars({("core", "executor"): 
"local_exec:airflow.executors.local_executor.LocalExecutor"}):
-            ExecutorLoader.init_executors()
-            assert isinstance(ExecutorLoader.load_executor("local_exec"), 
LocalExecutor)
+            executor_loader.ExecutorLoader.init_executors()
+            assert 
isinstance(executor_loader.ExecutorLoader.load_executor("local_exec"), 
LocalExecutor)
+            assert isinstance(
+                executor_loader.ExecutorLoader.load_executor(
+                    "airflow.executors.local_executor.LocalExecutor"
+                ),
+                LocalExecutor,
+            )
             assert isinstance(
-                
ExecutorLoader.load_executor("airflow.executors.local_executor.LocalExecutor"),
+                
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
                 LocalExecutor,
             )
-            assert 
isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]), 
LocalExecutor)
 
     
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor",
 autospec=True)
     def test_load_custom_executor_with_classname(self, mock_executor):
@@ -319,15 +316,16 @@ class TestExecutorLoader:
                 ): 
"my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
             }
         ):
-            ExecutorLoader.init_executors()
-            assert isinstance(ExecutorLoader.load_executor("my_alias"), 
AwsEcsExecutor)
-            assert isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"), 
AwsEcsExecutor)
+            executor_loader.ExecutorLoader.init_executors()
+            assert 
isinstance(executor_loader.ExecutorLoader.load_executor("my_alias"), 
AwsEcsExecutor)
+            assert 
isinstance(executor_loader.ExecutorLoader.load_executor("AwsEcsExecutor"), 
AwsEcsExecutor)
             assert isinstance(
-                ExecutorLoader.load_executor(
+                executor_loader.ExecutorLoader.load_executor(
                     
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
                 ),
                 AwsEcsExecutor,
             )
             assert isinstance(
-                
ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
+                
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+                AwsEcsExecutor,
             )
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f23eac76b6e..5f721b61d26 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -28,7 +28,6 @@ import warnings
 import weakref
 from contextlib import redirect_stdout
 from datetime import timedelta
-from importlib import reload
 from io import StringIO
 from pathlib import Path
 from typing import TYPE_CHECKING
@@ -56,7 +55,6 @@ from airflow.exceptions import (
     RemovedInAirflow3Warning,
     UnknownExecutorException,
 )
-from airflow.executors import executor_loader
 from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models.baseoperator import BaseOperator
@@ -3324,10 +3322,10 @@ class TestDagModel:
             ]
         }
 
+    @pytest.mark.usefixtures("clean_executor_loader")
     @mock.patch("airflow.models.dag.run_job")
     def test_dag_executors(self, run_job_mock):
         dag = DAG(dag_id="test", schedule=None)
-        reload(executor_loader)
         with conf_vars({("core", "executor"): "SequentialExecutor"}):
             dag.run()
             assert 
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, 
SequentialExecutor)
diff --git a/tests/test_utils/executor_loader.py 
b/tests/test_utils/executor_loader.py
new file mode 100644
index 00000000000..cc28223b7ce
--- /dev/null
+++ b/tests/test_utils/executor_loader.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import airflow.executors.executor_loader as executor_loader
+
+if TYPE_CHECKING:
+    from airflow.executors.executor_utils import ExecutorName
+
+
+def clean_executor_loader_module():
+    """Clean the executor_loader state, as it stores global variables in the 
module, causing side effects for some tests."""
+    executor_loader._alias_to_executors: dict[str, ExecutorName] = {}
+    executor_loader._module_to_executors: dict[str, ExecutorName] = {}
+    executor_loader._classname_to_executors: dict[str, ExecutorName] = {}
+    executor_loader._executor_names: list[ExecutorName] = []
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py 
b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 568d6abf025..9241145f7f5 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -48,6 +48,7 @@ def not_expected_tr_db_call():
         yield m
 
 
[email protected]("clean_executor_loader")
 class TestNotInReschedulePeriodDep:
     @pytest.fixture(autouse=True)
     def setup_test_cases(self, request, create_task_instance):
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index d3651370d65..95483f2285f 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -34,7 +34,7 @@ from requests.adapters import Response
 
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.executors import executor_loader
+from airflow.executors import executor_constants, executor_loader
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.models.dag import DAG
@@ -202,6 +202,95 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
+    @pytest.mark.parametrize(
+        "executor_name",
+        [
+            (executor_constants.LOCAL_KUBERNETES_EXECUTOR),
+            (executor_constants.CELERY_KUBERNETES_EXECUTOR),
+            (executor_constants.KUBERNETES_EXECUTOR),
+            (None),
+        ],
+    )
+    @conf_vars(
+        {
+            ("core", "EXECUTOR"): ",".join(
+                [
+                    executor_constants.LOCAL_KUBERNETES_EXECUTOR,
+                    executor_constants.CELERY_KUBERNETES_EXECUTOR,
+                    executor_constants.KUBERNETES_EXECUTOR,
+                ]
+            ),
+        }
+    )
+    @patch(
+        "airflow.executors.executor_loader.ExecutorLoader.load_executor",
+        wraps=executor_loader.ExecutorLoader.load_executor,
+    )
+    @patch(
+        
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
+        wraps=executor_loader.ExecutorLoader.get_default_executor,
+    )
+    def test_file_task_handler_with_multiple_executors(
+        self,
+        mock_get_default_executor,
+        mock_load_executor,
+        executor_name,
+        create_task_instance,
+        clean_executor_loader,
+    ):
+        executors_mapping = executor_loader.ExecutorLoader.executors
+        default_executor_name = 
executor_loader.ExecutorLoader.get_default_executor_name()
+        path_to_executor_class: str
+        if executor_name is None:
+            path_to_executor_class = 
executors_mapping.get(default_executor_name.alias)
+        else:
+            path_to_executor_class = executors_mapping.get(executor_name)
+
+        with patch(f"{path_to_executor_class}.get_task_log", return_value=([], 
[])) as mock_get_task_log:
+            mock_get_task_log.return_value = ([], [])
+            ti = create_task_instance(
+                dag_id="dag_for_testing_multiple_executors",
+                task_id="task_for_testing_multiple_executors",
+                run_type=DagRunType.SCHEDULED,
+                execution_date=DEFAULT_DATE,
+            )
+            if executor_name is not None:
+                ti.executor = executor_name
+            ti.try_number = 1
+            ti.state = TaskInstanceState.RUNNING
+            logger = ti.log
+            ti.log.disabled = False
+
+            file_handler = next(
+                (handler for handler in logger.handlers if handler.name == 
FILE_TASK_HANDLER), None
+            )
+            assert file_handler is not None
+
+            set_context(logger, ti)
+            # clear executor_instances cache
+            file_handler.executor_instances = {}
+            assert file_handler.handler is not None
+            # We expect set_context generates a file locally.
+            log_filename = file_handler.handler.baseFilename
+            assert os.path.isfile(log_filename)
+            assert log_filename.endswith("1.log"), log_filename
+
+            file_handler.flush()
+            file_handler.close()
+
+            assert hasattr(file_handler, "read")
+            file_handler.read(ti)
+            os.remove(log_filename)
+            mock_get_task_log.assert_called_once()
+
+            if executor_name is None:
+                mock_get_default_executor.assert_called_once()
+                # will be called in `ExecutorLoader.get_default_executor` 
method
+                
mock_load_executor.assert_called_once_with(default_executor_name)
+            else:
+                mock_get_default_executor.assert_not_called()
+                mock_load_executor.assert_called_once_with(executor_name)
+
     def test_file_task_handler_running(self):
         def task_callable(ti):
             ti.log.info("test")
@@ -296,6 +385,7 @@ class TestFileTaskLogHandler:
     @mock.patch(
         
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
     )
+    @pytest.mark.usefixtures("clean_executor_loader")
     @pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, 
TaskInstanceState.SUCCESS])
     def test__read_for_k8s_executor(self, mock_k8s_get_task_log, 
create_task_instance, state):
         """Test for k8s executor, the log is read from get_task_log method"""
@@ -309,6 +399,7 @@ class TestFileTaskLogHandler:
         )
         ti.state = state
         ti.triggerer_job = None
+        ti.executor = executor_name
         with conf_vars({("core", "executor"): executor_name}):
             reload(executor_loader)
             fth = FileTaskHandler("")
@@ -401,11 +492,12 @@ class TestFileTaskLogHandler:
             
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), 
"default"),
         ],
     )
-    @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
+    @conf_vars({("core", "executor"): "KubernetesExecutor"})
     @patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_read_from_k8s_under_multi_namespace_mode(
         self, mock_kube_client, pod_override, namespace_to_call
     ):
+        reload(executor_loader)
         mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
         mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
 
@@ -426,6 +518,7 @@ class TestFileTaskLogHandler:
         )
         ti = TaskInstance(task=task, run_id=dagrun.run_id)
         ti.try_number = 3
+        ti.executor = "KubernetesExecutor"
 
         logger = ti.log
         ti.log.disabled = False
@@ -434,6 +527,8 @@ class TestFileTaskLogHandler:
         set_context(logger, ti)
         ti.run(ignore_ti_state=True)
         ti.state = TaskInstanceState.RUNNING
+        # clear executor_instances cache
+        file_handler.executor_instances = {}
         file_handler.read(ti, 2)
 
         # first we find pod name

Reply via email to