This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cc9b9a8794d Revert "Remove fallback for old, pre 2.7, providers 
(#44755)" (#44886)
cc9b9a8794d is described below

commit cc9b9a8794da50630e26f5de04b4c0ae5d49353f
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Dec 12 18:59:55 2024 +0100

    Revert "Remove fallback for old, pre 2.7, providers (#44755)" (#44886)
    
    This reverts commit 02821cc369205b7df5553b79dc19969f63ba0a95.
---
 .../cli/commands/remote_commands/task_command.py   |   1 -
 airflow/config_templates/pre_2_7_defaults.cfg      | 105 +++++++++++++++++++++
 airflow/configuration.py                           |  26 ++++-
 airflow/settings.py                                |   2 +-
 airflow/utils/db.py                                |  10 +-
 airflow/utils/log/file_task_handler.py             |   8 ++
 newsfragments/44755.significant.rst                |  26 -----
 tests/cli/conftest.py                              |  44 ++++-----
 tests/cli/test_cli_parser.py                       |   7 +-
 tests/core/test_configuration.py                   |   4 +-
 tests/executors/test_executor_loader.py            |   7 +-
 tests/sensors/test_base.py                         |  35 ++++++-
 tests/www/views/test_views_tasks.py                |   6 ++
 13 files changed, 203 insertions(+), 78 deletions(-)

diff --git a/airflow/cli/commands/remote_commands/task_command.py 
b/airflow/cli/commands/remote_commands/task_command.py
index 60691217dd5..b2795b7bf9d 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -405,7 +405,6 @@ class TaskCommandMarker:
 
 
 @cli_utils.action_cli(check_db=False)
-@providers_configuration_loaded
 def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
     """
     Run a single task instance.
diff --git a/airflow/config_templates/pre_2_7_defaults.cfg 
b/airflow/config_templates/pre_2_7_defaults.cfg
new file mode 100644
index 00000000000..2568d42445f
--- /dev/null
+++ b/airflow/config_templates/pre_2_7_defaults.cfg
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file contains pre Airflow 2.7, provider defaults for Airflow 
configuration.
+# They are provided as fallback option to older version of the
+# providers that might expect them to be present.
+#
+# NOTE !!!! Please DO NOT modify values in the file even if they change in 
corresponding
+# providers. The values here should be treated as "read only" and should not 
be modified
+# even if defaults in newer versions of corresponding Providers change.
+# They are only here so that backwards compatible behaviour for old provider
+# versions can be maintained.
+
+[atlas]
+sasl_enabled = False
+host =
+port = 21000
+username =
+password =
+
+[hive]
+default_hive_mapred_queue =
+
+[local_kubernetes_executor]
+kubernetes_queue = kubernetes
+
+[celery_kubernetes_executor]
+kubernetes_queue = kubernetes
+
+[celery]
+celery_app_name = airflow.executors.celery_executor
+worker_concurrency = 16
+worker_prefetch_multiplier = 1
+worker_enable_remote_control = true
+broker_url = redis://redis:6379/0
+result_backend_sqlalchemy_engine_options =
+flower_host = 0.0.0.0
+flower_url_prefix =
+flower_port = 5555
+flower_basic_auth =
+sync_parallelism = 0
+celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+ssl_active = False
+ssl_key =
+ssl_cert =
+ssl_cacert =
+pool = prefork
+operation_timeout = 1.0
+task_track_started = True
+task_publish_max_retries = 3
+worker_precheck = False
+
+[elasticsearch]
+host =
+log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+end_of_log_mark = end_of_log
+frontend =
+write_stdout = False
+json_format = False
+json_fields = asctime, filename, lineno, levelname, message
+host_field = host
+offset_field = offset
+index_patterns = _all
+
+[elasticsearch_configs]
+use_ssl = False
+verify_certs = True
+
+[kubernetes_executor]
+api_client_retry_configuration =
+logs_task_metadata = False
+pod_template_file =
+worker_container_repository =
+worker_container_tag =
+namespace = default
+delete_worker_pods = True
+delete_worker_pods_on_failure = False
+worker_pods_creation_batch_size = 1
+multi_namespace_mode = False
+multi_namespace_mode_namespace_list =
+in_cluster = True
+kube_client_request_args =
+delete_option_kwargs =
+enable_tcp_keepalive = True
+tcp_keep_idle = 120
+tcp_keep_intvl = 30
+tcp_keep_cnt = 6
+verify_ssl = True
+worker_pods_queued_check_interval = 60
+ssl_ca_cert =
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 47d2361dfaf..76e2690ffe1 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -209,6 +209,7 @@ class AirflowConfigParser(ConfigParser):
         # interpolation placeholders. The _default_values config parser will 
interpolate them
         # properly when we call get() on it.
         self._default_values = 
create_default_config_parser(self.configuration_description)
+        self._pre_2_7_default_values = create_pre_2_7_defaults()
         if default_config is not None:
             self._update_defaults_from_string(default_config)
         self._update_logging_deprecated_template_to_one_from_defaults()
@@ -291,6 +292,10 @@ class AirflowConfigParser(ConfigParser):
             return value.replace("%", "%%")
         return value
 
+    def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> 
Any:
+        """Get pre 2.7 default config values."""
+        return self._pre_2_7_default_values.get(section, key, fallback=None, 
**kwargs)
+
     # These configuration elements can be fetched as the stdout of commands
     # following the "{section}__{name}_cmd" pattern, the idea behind this
     # is to not store password on boxes in text files.
@@ -449,8 +454,6 @@ class AirflowConfigParser(ConfigParser):
         elif not with_providers and self._providers_configuration_loaded:
             reload_providers_when_leaving = True
             self.restore_core_default_configuration()
-        elif self._providers_configuration_loaded:
-            reload_providers_when_leaving = True
         yield
         if reload_providers_when_leaving:
             self.load_providers_configuration()
@@ -986,6 +989,10 @@ class AirflowConfigParser(ConfigParser):
         if self.get_default_value(section, key) is not None or "fallback" in 
kwargs:
             return expand_env_var(self.get_default_value(section, key, 
**kwargs))
 
+        if self.get_default_pre_2_7_value(section, key) is not None:
+            # no expansion needed
+            return self.get_default_pre_2_7_value(section, key, **kwargs)
+
         if not suppress_warnings:
             log.warning("section/key [%s/%s] not found in config", section, 
key)
 
@@ -1375,6 +1382,7 @@ class AirflowConfigParser(ConfigParser):
 
         # We check sequentially all those sources and the last one we saw it 
in will "win"
         configs: Iterable[tuple[str, ConfigParser]] = [
+            ("default-pre-2-7", self._pre_2_7_default_values),
             ("default", self._default_values),
             ("airflow.cfg", self),
         ]
@@ -1893,6 +1901,20 @@ def 
create_default_config_parser(configuration_description: dict[str, dict[str,
     return parser
 
 
+def create_pre_2_7_defaults() -> ConfigParser:
+    """
+    Create parser using the old defaults from Airflow < 2.7.0.
+
+    This is used in order to be able to fall-back to those defaults when old 
version of provider,
+    not supporting "config contribution" is installed with Airflow 2.7.0+. 
This "default"
+    configuration does not support variable expansion, those are pretty much 
hard-coded defaults '
+    we want to fall-back to in such case.
+    """
+    config_parser = ConfigParser()
+    config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
+    return config_parser
+
+
 def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
     airflow_config = pathlib.Path(AIRFLOW_CONFIG)
     if airflow_config.is_dir():
diff --git a/airflow/settings.py b/airflow/settings.py
index e9b560d5f97..afde6d68d7d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -535,7 +535,7 @@ def validate_session():
     """Validate ORM Session."""
     global engine
 
-    worker_precheck = conf.getboolean("celery", "worker_precheck", 
fallback=False)
+    worker_precheck = conf.getboolean("celery", "worker_precheck")
     if not worker_precheck:
         return True
     else:
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 3f30ed0a642..fe5c4c0a791 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -950,12 +950,8 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
         log.info("Log template table does not exist (added in 2.3.0); skipping 
log template sync.")
         return
 
-    es_log_it_template_fallback = 
"{dag_id}-{task_id}-{execution_date}-{try_number}"
-
     filename = conf.get("logging", "log_filename_template")
-    elasticsearch_id = conf.get("elasticsearch", "log_id_template", 
fallback=es_log_it_template_fallback)
-
-    # TODO: The elasticsearch specific stuff here is probably inappropriate - 
provider is bleeding into core
+    elasticsearch_id = conf.get("elasticsearch", "log_id_template")
 
     stored = session.execute(
         select(
@@ -969,9 +965,7 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
     # If we have an empty table, and the default values exist, we will seed the
     # table with values from pre 2.3.0, so old logs will still be retrievable.
     if not stored:
-        is_default_log_id = elasticsearch_id == conf.get_default_value(
-            "elasticsearch", "log_id_template", 
fallback=es_log_it_template_fallback
-        )
+        is_default_log_id = elasticsearch_id == 
conf.get_default_value("elasticsearch", "log_id_template")
         is_default_filename = filename == conf.get_default_value("logging", 
"log_filename_template")
         if is_default_log_id and is_default_filename:
             session.add(
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index e1d990d6e82..09866de7214 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -404,6 +404,14 @@ class FileTaskHandler(logging.Handler):
         out_message = logs if "log_pos" in (metadata or {}) else messages + 
logs
         return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace")
+
     def _get_log_retrieval_url(
         self, ti: TaskInstance, log_relative_path: str, log_type: LogType | 
None = None
     ) -> tuple[str, str]:
diff --git a/newsfragments/44755.significant.rst 
b/newsfragments/44755.significant.rst
deleted file mode 100644
index 38fe6e8bf63..00000000000
--- a/newsfragments/44755.significant.rst
+++ /dev/null
@@ -1,26 +0,0 @@
-Provider configuration fallbacks removed from core
-
-Around ~2.7, we moved provider config from core into the providers themselves.
-However, if providers that were released before that change was used on 2.7+,
-there could be failures because the config wouldn't be in those providers!
-So, we added a fallback for all of the configs that were moved. These fallbacks
-have now been removed. You must use providers that contain their own config.
-
-TODO: It's unlikely this will be the change that sets the "lowest" min 
provider version
-for compatibility with Airflow 3. But, it is possible, and if it is we will 
need to
-then determine what those versions are.
-
-Note: This changes doesn't really fix in any of the following "types", but 
I've marked
-dependency change so there is something checked.
-
-.. Check the type of change that applies to this change
-
-* Types of change
-
-  * [ ] DAG changes
-  * [ ] Config changes
-  * [ ] API changes
-  * [ ] CLI changes
-  * [ ] Behaviour changes
-  * [ ] Plugin changes
-  * [x] Dependency change
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index f8ee6b930f6..17f891c94f8 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -23,33 +23,29 @@ import pytest
 
 from airflow.executors import local_executor
 from airflow.models.dagbag import DagBag
+from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
+from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, 
local_kubernetes_executor
 
 from tests_common.test_utils.config import conf_vars
 
-
[email protected]
-def custom_executors():
-    from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
-    from airflow.providers.cncf.kubernetes.executors import 
kubernetes_executor, local_kubernetes_executor
-
-    # Create custom executors here because conftest is imported first
-    custom_executor_module = type(sys)("custom_executor")
-    custom_executor_module.CustomCeleryExecutor = type(  # type:  ignore
-        "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
-    )
-    custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: 
ignore
-        "CustomCeleryKubernetesExecutor", 
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
-    )
-    custom_executor_module.CustomLocalExecutor = type(  # type:  ignore
-        "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
-    )
-    custom_executor_module.CustomLocalKubernetesExecutor = type(  # type: 
ignore
-        "CustomLocalKubernetesExecutor", 
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
-    )
-    custom_executor_module.CustomKubernetesExecutor = type(  # type:  ignore
-        "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), 
{}
-    )
-    sys.modules["custom_executor"] = custom_executor_module
+# Create custom executors here because conftest is imported first
+custom_executor_module = type(sys)("custom_executor")
+custom_executor_module.CustomCeleryExecutor = type(  # type:  ignore
+    "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
+)
+custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: ignore
+    "CustomCeleryKubernetesExecutor", 
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
+)
+custom_executor_module.CustomLocalExecutor = type(  # type:  ignore
+    "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
+)
+custom_executor_module.CustomLocalKubernetesExecutor = type(  # type: ignore
+    "CustomLocalKubernetesExecutor", 
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
+)
+custom_executor_module.CustomKubernetesExecutor = type(  # type:  ignore
+    "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
+)
+sys.modules["custom_executor"] = custom_executor_module
 
 
 @pytest.fixture(autouse=True)
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 3ea3473c3f8..23363b379d8 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -42,6 +42,7 @@ from airflow.executors import executor_loader
 from airflow.executors.executor_utils import ExecutorName
 from airflow.executors.local_executor import LocalExecutor
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 
 from tests_common.test_utils.config import conf_vars
 
@@ -166,7 +167,7 @@ class TestCli:
             # force re-evaluation of cli commands (done in top level code)
             reload(cli_parser)
 
-    
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
+    @patch.object(CeleryExecutor, "get_cli_commands")
     @patch.object(AwsEcsExecutor, "get_cli_commands")
     def test_hybrid_executor_get_cli_commands(
         self, ecs_executor_cli_commands_mock, celery_executor_cli_commands_mock
@@ -200,7 +201,7 @@ class TestCli:
         assert celery_executor_command.name in commands
         assert ecs_executor_command.name in commands
 
-    
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
+    @patch.object(CeleryExecutor, "get_cli_commands")
     @patch.object(AwsEcsExecutor, "get_cli_commands")
     def test_hybrid_executor_get_cli_commands_with_error(
         self, ecs_executor_cli_commands_mock, 
celery_executor_cli_commands_mock, caplog
@@ -377,7 +378,7 @@ class TestCli:
             ("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
         ],
     )
-    def test_cli_parser_executors(self, custom_executors, executor, 
expected_args):
+    def test_cli_parser_executors(self, executor, expected_args):
         """Test that CLI commands for the configured executor are present"""
         for expected_arg in expected_args:
             with (
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index efc88d476ea..b19d6ff0cf2 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -1594,8 +1594,8 @@ def test_restore_and_reload_provider_configuration():
     assert conf.get("celery", "celery_app_name") == 
"airflow.providers.celery.executors.celery_executor"
     conf.restore_core_default_configuration()
     assert conf.providers_configuration_loaded is False
-    with pytest.raises(AirflowConfigException, match="not found in config"):
-        conf.get("celery", "celery_app_name")
+    # built-in pre-2-7 celery executor
+    assert conf.get("celery", "celery_app_name") == 
"airflow.executors.celery_executor"
     conf.load_providers_configuration()
     assert conf.providers_configuration_loaded is True
     assert conf.get("celery", "celery_app_name") == 
"airflow.providers.celery.executors.celery_executor"
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index d783952b66c..48c531c5cb9 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -26,7 +26,7 @@ from airflow.executors import executor_loader
 from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, 
ExecutorName
 from airflow.executors.local_executor import LocalExecutor
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
-from airflow.providers_manager import ProvidersManager
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 
 from tests_common.test_utils.config import conf_vars
 
@@ -141,11 +141,6 @@ class TestExecutorLoader:
             assert executors == expected_executors_list
 
     def test_init_executors(self):
-        # We need to init provider config in order to import CeleryExecutor
-        ProvidersManager().initialize_providers_configuration()
-
-        from airflow.providers.celery.executors.celery_executor import 
CeleryExecutor
-
         with conf_vars({("core", "executor"): "CeleryExecutor"}):
             executors = ExecutorLoader.init_executors()
             executor_name = ExecutorLoader.get_default_executor_name()
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 53096576384..9bb4f5b9934 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -33,11 +33,25 @@ from airflow.exceptions import (
     AirflowTaskTimeout,
 )
 from airflow.executors.debug_executor import DebugExecutor
+from airflow.executors.executor_constants import (
+    CELERY_EXECUTOR,
+    CELERY_KUBERNETES_EXECUTOR,
+    DEBUG_EXECUTOR,
+    KUBERNETES_EXECUTOR,
+    LOCAL_EXECUTOR,
+    LOCAL_KUBERNETES_EXECUTOR,
+    SEQUENTIAL_EXECUTOR,
+)
 from airflow.executors.local_executor import LocalExecutor
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import TaskInstance, TaskReschedule
 from airflow.models.trigger import TriggerFailureReason
 from airflow.models.xcom import XCom
 from airflow.operators.empty import EmptyOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
+from airflow.providers.celery.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import 
KubernetesExecutor
+from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor 
import LocalKubernetesExecutor
 from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, 
poke_mode_only
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.utils import timezone
@@ -1008,20 +1022,31 @@ class TestBaseSensor:
         assert actual_xcom_value is None
 
     @pytest.mark.parametrize(
-        "executor_cls, expected_mode",
+        "executor_cls_mode",
         [
+            (CeleryExecutor, "poke"),
+            (CeleryKubernetesExecutor, "poke"),
             (DebugExecutor, "reschedule"),
+            (KubernetesExecutor, "poke"),
             (LocalExecutor, "poke"),
+            (LocalKubernetesExecutor, "poke"),
+            (SequentialExecutor, "poke"),
         ],
         ids=[
-            "debug_executor_reschedule",
-            "local_executor_poke",
+            CELERY_EXECUTOR,
+            CELERY_KUBERNETES_EXECUTOR,
+            DEBUG_EXECUTOR,
+            KUBERNETES_EXECUTOR,
+            LOCAL_EXECUTOR,
+            LOCAL_KUBERNETES_EXECUTOR,
+            SEQUENTIAL_EXECUTOR,
         ],
     )
-    def test_change_sensor_mode_to_reschedule(self, executor_cls, 
expected_mode):
+    def test_prepare_for_execution(self, executor_cls_mode):
         """
         Should change mode of the task to reschedule if using DEBUG_EXECUTOR
         """
+        executor_cls, mode = executor_cls_mode
         sensor = DummySensor(
             task_id=SENSOR_OP,
             return_value=None,
@@ -1035,7 +1060,7 @@ class TestBaseSensor:
         ) as load_executor:
             load_executor.return_value = (executor_cls, None)
             task = sensor.prepare_for_execution()
-            assert task.mode == expected_mode
+            assert task.mode == mode
 
     def test_resume_execution(self):
         op = BaseSensorOperator(task_id="hi")
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index 4c05c46152b..11c155d3d60 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -35,6 +35,7 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.xcom import XCom
 from airflow.operators.empty import EmptyOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -609,6 +610,11 @@ def test_dag_never_run(admin_client, url):
     check_content_in_response(f"Cannot mark tasks as {url}, seem that DAG 
{dag_id} has never run", resp)
 
 
+class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
+    def heartbeat(self):
+        return True
+
+
 def test_delete_dag_button_for_dag_on_scheduler_only(admin_client, dag_maker):
     with dag_maker() as dag:
         EmptyOperator(task_id="task")

Reply via email to