This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 78569832045 [v2-10-test] Fix `FileTaskHandler` only read from default
executor (#46000)
78569832045 is described below
commit 78569832045b9c6119d601faa8d6b945f2c16009
Author: LIU ZHE YOU <[email protected]>
AuthorDate: Mon Jan 27 01:36:44 2025 +0800
[v2-10-test] Fix `FileTaskHandler` only read from default executor (#46000)
* [v2-10-test] Fix FileTaskHandler only read from default executor
* Fix test_dag_executors side effect
---
airflow/executors/executor_loader.py | 4 +
airflow/utils/log/file_task_handler.py | 31 +++++--
tests/conftest.py | 12 +++
tests/executors/test_executor_loader.py | 88 ++++++++++---------
tests/models/test_dag.py | 4 +-
tests/test_utils/executor_loader.py | 33 ++++++++
tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 1 +
tests/utils/test_log_handlers.py | 99 +++++++++++++++++++++-
8 files changed, 216 insertions(+), 56 deletions(-)
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 3c088879061..7ad42a2fb1b 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -201,6 +201,10 @@ class ExecutorLoader:
@classmethod
def lookup_executor_name_by_str(cls, executor_name_str: str) ->
ExecutorName:
# lookup the executor by alias first, if not check if we're given a
module path
+ if not _classname_to_executors or not _module_to_executors or not
_alias_to_executors:
+ # if we haven't loaded the executors yet, such as directly calling
load_executor
+ cls._get_executor_names()
+
if executor_name := _alias_to_executors.get(executor_name_str):
return executor_name
elif executor_name := _module_to_executors.get(executor_name_str):
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 9eb55c707f1..2df73ef4ffa 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -46,6 +46,7 @@ from airflow.utils.state import State, TaskInstanceState
if TYPE_CHECKING:
from pendulum import DateTime
+ from airflow.executors.base_executor import BaseExecutor
from airflow.models import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
@@ -185,6 +186,8 @@ class FileTaskHandler(logging.Handler):
inherits_from_empty_operator_log_message = (
"Operator inherits from empty operator and thus does not have logs"
)
+ executor_instances: dict[str, BaseExecutor] = {}
+ DEFAULT_EXECUTOR_KEY = "_default_executor"
def __init__(
self,
@@ -340,11 +343,26 @@ class FileTaskHandler(logging.Handler):
def _read_grouped_logs(self):
return False
- @cached_property
- def _executor_get_task_log(self) -> Callable[[TaskInstance, int],
tuple[list[str], list[str]]]:
- """This cached property avoids loading executor repeatedly."""
- executor = ExecutorLoader.get_default_executor()
- return executor.get_task_log
+ def _get_executor_get_task_log(
+ self, ti: TaskInstance
+ ) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]:
+ """
+ Get the get_task_log method from executor of current task instance.
+
+ Since there might be multiple executors, so we need to get the
executor of current task instance instead of getting from default executor.
+ :param ti: task instance object
+ :return: get_task_log method of the executor
+ """
+ executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY
+ executor = self.executor_instances.get(executor_name)
+ if executor is not None:
+ return executor.get_task_log
+
+ if executor_name == self.DEFAULT_EXECUTOR_KEY:
+ self.executor_instances[executor_name] =
ExecutorLoader.get_default_executor()
+ else:
+ self.executor_instances[executor_name] =
ExecutorLoader.load_executor(executor_name)
+ return self.executor_instances[executor_name].get_task_log
def _read(
self,
@@ -386,7 +404,8 @@ class FileTaskHandler(logging.Handler):
messages_list.extend(remote_messages)
has_k8s_exec_pod = False
if ti.state == TaskInstanceState.RUNNING:
- response = self._executor_get_task_log(ti, try_number)
+ executor_get_task_log = self._get_executor_get_task_log(ti)
+ response = executor_get_task_log(ti, try_number)
if response:
executor_messages, executor_logs = response
if executor_messages:
diff --git a/tests/conftest.py b/tests/conftest.py
index 6d064fa0a9b..6c17c6f4036 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1487,3 +1487,15 @@ def clean_dags_and_dagruns():
yield # Test runs here
clear_db_dags()
clear_db_runs()
+
+
[email protected]
+def clean_executor_loader():
+ """Clean the executor_loader state, as it stores global variables in the
module, causing side effects for some tests."""
+ from airflow.executors.executor_loader import ExecutorLoader
+ from tests.test_utils.executor_loader import clean_executor_loader_module
+
+ clean_executor_loader_module()
+ yield # Test runs here
+ clean_executor_loader_module()
+ ExecutorLoader.init_executors()
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index 2192487a01c..dc60b9cc507 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -17,7 +17,6 @@
from __future__ import annotations
from contextlib import nullcontext
-from importlib import reload
from unittest import mock
import pytest
@@ -25,7 +24,7 @@ import pytest
from airflow import plugins_manager
from airflow.exceptions import AirflowConfigException
from airflow.executors import executor_loader
-from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader,
ExecutorName
+from airflow.executors.executor_loader import ConnectorSource, ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import
AwsEcsExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
@@ -50,24 +49,12 @@ class FakePlugin(plugins_manager.AirflowPlugin):
executors = [FakeExecutor]
[email protected]("clean_executor_loader")
class TestExecutorLoader:
- def setup_method(self) -> None:
- from airflow.executors import executor_loader
-
- reload(executor_loader)
- global ExecutorLoader
- ExecutorLoader = executor_loader.ExecutorLoader # type: ignore
-
- def teardown_method(self) -> None:
- from airflow.executors import executor_loader
-
- reload(executor_loader)
- ExecutorLoader.init_executors()
-
def test_no_executor_configured(self):
with conf_vars({("core", "executor"): None}):
with pytest.raises(AirflowConfigException, match=r".*not found in
config$"):
- ExecutorLoader.get_default_executor()
+ executor_loader.ExecutorLoader.get_default_executor()
@pytest.mark.parametrize(
"executor_name",
@@ -81,18 +68,20 @@ class TestExecutorLoader:
)
def test_should_support_executor_from_core(self, executor_name):
with conf_vars({("core", "executor"): executor_name}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert executor_name == executor.__class__.__name__
assert executor.name is not None
- assert executor.name ==
ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
+ assert executor.name == ExecutorName(
+ executor_loader.ExecutorLoader.executors[executor_name],
alias=executor_name
+ )
assert executor.name.connector_source == ConnectorSource.CORE
@mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
@mock.patch("airflow.plugins_manager.executors_modules", None)
def test_should_support_plugins(self):
with conf_vars({("core", "executor"):
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert "FakeExecutor" == executor.__class__.__name__
assert executor.name is not None
@@ -101,7 +90,7 @@ class TestExecutorLoader:
def test_should_support_custom_path(self):
with conf_vars({("core", "executor"):
"tests.executors.test_executor_loader.FakeExecutor"}):
- executor = ExecutorLoader.get_default_executor()
+ executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert "FakeExecutor" == executor.__class__.__name__
assert executor.name is not None
@@ -172,17 +161,17 @@ class TestExecutorLoader:
)
def test_get_hybrid_executors_from_config(self, executor_config,
expected_executors_list):
with conf_vars({("core", "executor"): executor_config}):
- executors = ExecutorLoader._get_executor_names()
+ executors = executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
def test_init_executors(self):
with conf_vars({("core", "executor"): "CeleryExecutor"}):
- executors = ExecutorLoader.init_executors()
- executor_name = ExecutorLoader.get_default_executor_name()
+ executors = executor_loader.ExecutorLoader.init_executors()
+ executor_name =
executor_loader.ExecutorLoader.get_default_executor_name()
assert len(executors) == 1
assert isinstance(executors[0], CeleryExecutor)
- assert "CeleryExecutor" in ExecutorLoader.executors
- assert ExecutorLoader.executors["CeleryExecutor"] ==
executor_name.module_path
+ assert "CeleryExecutor" in executor_loader.ExecutorLoader.executors
+ assert executor_loader.ExecutorLoader.executors["CeleryExecutor"]
== executor_name.module_path
assert
isinstance(executor_loader._loaded_executors[executor_name], CeleryExecutor)
@pytest.mark.parametrize(
@@ -202,7 +191,7 @@ class TestExecutorLoader:
with pytest.raises(
AirflowConfigException, match=r".+Duplicate executors are not
yet supported.+"
):
- ExecutorLoader._get_executor_names()
+ executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
"executor_config",
@@ -218,7 +207,7 @@ class TestExecutorLoader:
def
test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
executor_config):
with conf_vars({("core", "executor"): executor_config}):
with pytest.raises(AirflowConfigException):
- ExecutorLoader._get_executor_names()
+ executor_loader.ExecutorLoader._get_executor_names()
@pytest.mark.parametrize(
("executor_config", "expected_value"),
@@ -234,7 +223,7 @@ class TestExecutorLoader:
)
def test_should_support_import_executor_from_core(self, executor_config,
expected_value):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ executor, import_source =
executor_loader.ExecutorLoader.import_default_executor_cls()
assert expected_value == executor.__name__
assert import_source == ConnectorSource.CORE
@@ -249,7 +238,7 @@ class TestExecutorLoader:
)
def test_should_support_import_plugins(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ executor, import_source =
executor_loader.ExecutorLoader.import_default_executor_cls()
assert "FakeExecutor" == executor.__name__
assert import_source == ConnectorSource.PLUGIN
@@ -263,7 +252,7 @@ class TestExecutorLoader:
)
def test_should_support_import_custom_path(self, executor_config):
with conf_vars({("core", "executor"): executor_config}):
- executor, import_source =
ExecutorLoader.import_default_executor_cls()
+ executor, import_source =
executor_loader.ExecutorLoader.import_default_executor_cls()
assert "FakeExecutor" == executor.__name__
assert import_source == ConnectorSource.CUSTOM_PATH
@@ -272,7 +261,7 @@ class TestExecutorLoader:
@pytest.mark.parametrize("executor", [FakeExecutor,
FakeSingleThreadedExecutor])
def test_validate_database_executor_compatibility_general(self,
monkeypatch, executor):
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
- ExecutorLoader.validate_database_executor_compatibility(executor)
+
executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
@pytest.mark.db_test
@pytest.mark.backend("sqlite")
@@ -290,24 +279,32 @@ class TestExecutorLoader:
def test_validate_database_executor_compatibility_sqlite(self,
monkeypatch, executor, expectation):
monkeypatch.delenv("_AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK")
with expectation:
- ExecutorLoader.validate_database_executor_compatibility(executor)
+
executor_loader.ExecutorLoader.validate_database_executor_compatibility(executor)
def test_load_executor(self):
with conf_vars({("core", "executor"): "LocalExecutor"}):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("LocalExecutor"),
LocalExecutor)
- assert
isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor)
- assert isinstance(ExecutorLoader.load_executor(None),
LocalExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert
isinstance(executor_loader.ExecutorLoader.load_executor("LocalExecutor"),
LocalExecutor)
+ assert isinstance(
+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+ LocalExecutor,
+ )
+ assert
isinstance(executor_loader.ExecutorLoader.load_executor(None), LocalExecutor)
def test_load_executor_alias(self):
with conf_vars({("core", "executor"):
"local_exec:airflow.executors.local_executor.LocalExecutor"}):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("local_exec"),
LocalExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert
isinstance(executor_loader.ExecutorLoader.load_executor("local_exec"),
LocalExecutor)
+ assert isinstance(
+ executor_loader.ExecutorLoader.load_executor(
+ "airflow.executors.local_executor.LocalExecutor"
+ ),
+ LocalExecutor,
+ )
assert isinstance(
-
ExecutorLoader.load_executor("airflow.executors.local_executor.LocalExecutor"),
+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor,
)
- assert
isinstance(ExecutorLoader.load_executor(executor_loader._executor_names[0]),
LocalExecutor)
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor",
autospec=True)
def test_load_custom_executor_with_classname(self, mock_executor):
@@ -319,15 +316,16 @@ class TestExecutorLoader:
):
"my_alias:airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
}
):
- ExecutorLoader.init_executors()
- assert isinstance(ExecutorLoader.load_executor("my_alias"),
AwsEcsExecutor)
- assert isinstance(ExecutorLoader.load_executor("AwsEcsExecutor"),
AwsEcsExecutor)
+ executor_loader.ExecutorLoader.init_executors()
+ assert
isinstance(executor_loader.ExecutorLoader.load_executor("my_alias"),
AwsEcsExecutor)
+ assert
isinstance(executor_loader.ExecutorLoader.load_executor("AwsEcsExecutor"),
AwsEcsExecutor)
assert isinstance(
- ExecutorLoader.load_executor(
+ executor_loader.ExecutorLoader.load_executor(
"airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor"
),
AwsEcsExecutor,
)
assert isinstance(
-
ExecutorLoader.load_executor(executor_loader._executor_names[0]), AwsEcsExecutor
+
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
+ AwsEcsExecutor,
)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f23eac76b6e..5f721b61d26 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -28,7 +28,6 @@ import warnings
import weakref
from contextlib import redirect_stdout
from datetime import timedelta
-from importlib import reload
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
@@ -56,7 +55,6 @@ from airflow.exceptions import (
RemovedInAirflow3Warning,
UnknownExecutorException,
)
-from airflow.executors import executor_loader
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models.baseoperator import BaseOperator
@@ -3324,10 +3322,10 @@ class TestDagModel:
]
}
+ @pytest.mark.usefixtures("clean_executor_loader")
@mock.patch("airflow.models.dag.run_job")
def test_dag_executors(self, run_job_mock):
dag = DAG(dag_id="test", schedule=None)
- reload(executor_loader)
with conf_vars({("core", "executor"): "SequentialExecutor"}):
dag.run()
assert
isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor,
SequentialExecutor)
diff --git a/tests/test_utils/executor_loader.py
b/tests/test_utils/executor_loader.py
new file mode 100644
index 00000000000..cc28223b7ce
--- /dev/null
+++ b/tests/test_utils/executor_loader.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import airflow.executors.executor_loader as executor_loader
+
+if TYPE_CHECKING:
+ from airflow.executors.executor_utils import ExecutorName
+
+
+def clean_executor_loader_module():
+ """Clean the executor_loader state, as it stores global variables in the
module, causing side effects for some tests."""
+ executor_loader._alias_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._module_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._classname_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._executor_names: list[ExecutorName] = []
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 568d6abf025..9241145f7f5 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -48,6 +48,7 @@ def not_expected_tr_db_call():
yield m
[email protected]("clean_executor_loader")
class TestNotInReschedulePeriodDep:
@pytest.fixture(autouse=True)
def setup_test_cases(self, request, create_task_instance):
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index d3651370d65..95483f2285f 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -34,7 +34,7 @@ from requests.adapters import Response
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
from airflow.exceptions import RemovedInAirflow3Warning
-from airflow.executors import executor_loader
+from airflow.executors import executor_constants, executor_loader
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.dag import DAG
@@ -202,6 +202,95 @@ class TestFileTaskLogHandler:
# Remove the generated tmp log file.
os.remove(log_filename)
+ @pytest.mark.parametrize(
+ "executor_name",
+ [
+ (executor_constants.LOCAL_KUBERNETES_EXECUTOR),
+ (executor_constants.CELERY_KUBERNETES_EXECUTOR),
+ (executor_constants.KUBERNETES_EXECUTOR),
+ (None),
+ ],
+ )
+ @conf_vars(
+ {
+ ("core", "EXECUTOR"): ",".join(
+ [
+ executor_constants.LOCAL_KUBERNETES_EXECUTOR,
+ executor_constants.CELERY_KUBERNETES_EXECUTOR,
+ executor_constants.KUBERNETES_EXECUTOR,
+ ]
+ ),
+ }
+ )
+ @patch(
+ "airflow.executors.executor_loader.ExecutorLoader.load_executor",
+ wraps=executor_loader.ExecutorLoader.load_executor,
+ )
+ @patch(
+
"airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
+ wraps=executor_loader.ExecutorLoader.get_default_executor,
+ )
+ def test_file_task_handler_with_multiple_executors(
+ self,
+ mock_get_default_executor,
+ mock_load_executor,
+ executor_name,
+ create_task_instance,
+ clean_executor_loader,
+ ):
+ executors_mapping = executor_loader.ExecutorLoader.executors
+ default_executor_name =
executor_loader.ExecutorLoader.get_default_executor_name()
+ path_to_executor_class: str
+ if executor_name is None:
+ path_to_executor_class =
executors_mapping.get(default_executor_name.alias)
+ else:
+ path_to_executor_class = executors_mapping.get(executor_name)
+
+ with patch(f"{path_to_executor_class}.get_task_log", return_value=([],
[])) as mock_get_task_log:
+ mock_get_task_log.return_value = ([], [])
+ ti = create_task_instance(
+ dag_id="dag_for_testing_multiple_executors",
+ task_id="task_for_testing_multiple_executors",
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ )
+ if executor_name is not None:
+ ti.executor = executor_name
+ ti.try_number = 1
+ ti.state = TaskInstanceState.RUNNING
+ logger = ti.log
+ ti.log.disabled = False
+
+ file_handler = next(
+ (handler for handler in logger.handlers if handler.name ==
FILE_TASK_HANDLER), None
+ )
+ assert file_handler is not None
+
+ set_context(logger, ti)
+ # clear executor_instances cache
+ file_handler.executor_instances = {}
+ assert file_handler.handler is not None
+ # We expect set_context generates a file locally.
+ log_filename = file_handler.handler.baseFilename
+ assert os.path.isfile(log_filename)
+ assert log_filename.endswith("1.log"), log_filename
+
+ file_handler.flush()
+ file_handler.close()
+
+ assert hasattr(file_handler, "read")
+ file_handler.read(ti)
+ os.remove(log_filename)
+ mock_get_task_log.assert_called_once()
+
+ if executor_name is None:
+ mock_get_default_executor.assert_called_once()
+ # will be called in `ExecutorLoader.get_default_executor`
method
+
mock_load_executor.assert_called_once_with(default_executor_name)
+ else:
+ mock_get_default_executor.assert_not_called()
+ mock_load_executor.assert_called_once_with(executor_name)
+
def test_file_task_handler_running(self):
def task_callable(ti):
ti.log.info("test")
@@ -296,6 +385,7 @@ class TestFileTaskLogHandler:
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log"
)
+ @pytest.mark.usefixtures("clean_executor_loader")
@pytest.mark.parametrize("state", [TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS])
def test__read_for_k8s_executor(self, mock_k8s_get_task_log,
create_task_instance, state):
"""Test for k8s executor, the log is read from get_task_log method"""
@@ -309,6 +399,7 @@ class TestFileTaskLogHandler:
)
ti.state = state
ti.triggerer_job = None
+ ti.executor = executor_name
with conf_vars({("core", "executor"): executor_name}):
reload(executor_loader)
fth = FileTaskHandler("")
@@ -401,11 +492,12 @@ class TestFileTaskLogHandler:
pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")),
"default"),
],
)
- @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
+ @conf_vars({("core", "executor"): "KubernetesExecutor"})
@patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_read_from_k8s_under_multi_namespace_mode(
self, mock_kube_client, pod_override, namespace_to_call
):
+ reload(executor_loader)
mock_read_log = mock_kube_client.return_value.read_namespaced_pod_log
mock_list_pod = mock_kube_client.return_value.list_namespaced_pod
@@ -426,6 +518,7 @@ class TestFileTaskLogHandler:
)
ti = TaskInstance(task=task, run_id=dagrun.run_id)
ti.try_number = 3
+ ti.executor = "KubernetesExecutor"
logger = ti.log
ti.log.disabled = False
@@ -434,6 +527,8 @@ class TestFileTaskLogHandler:
set_context(logger, ti)
ti.run(ignore_ti_state=True)
ti.state = TaskInstanceState.RUNNING
+ # clear executor_instances cache
+ file_handler.executor_instances = {}
file_handler.read(ti, 2)
# first we find pod name