This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 02821cc3692 Remove fallback for old, pre 2.7, providers (#44755)
02821cc3692 is described below

commit 02821cc369205b7df5553b79dc19969f63ba0a95
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Dec 12 09:52:02 2024 -0700

    Remove fallback for old, pre 2.7, providers (#44755)
    
    In ~2.7, we moved provider config from core into the providers themselves.
    However, if providers that were released before that change was used on 
2.7+,
    there could be failures because the config wouldn't be in those providers!
    So, we added a fallback for all of the configs that were moved.
    
    As we move toward Airflow 3, we don't need to carry this baggage
    forever. This does mean provider released in earlier than mid 2023 won't
    work with Airflow 3 (and I'd imagine there will be other reasons they
    wont work as well).
---
 .../cli/commands/remote_commands/task_command.py   |   1 +
 airflow/config_templates/pre_2_7_defaults.cfg      | 105 ---------------------
 airflow/configuration.py                           |  26 +----
 airflow/settings.py                                |   2 +-
 airflow/utils/db.py                                |  10 +-
 airflow/utils/log/file_task_handler.py             |   8 --
 newsfragments/44755.significant.rst                |  26 +++++
 tests/cli/conftest.py                              |  44 +++++----
 tests/cli/test_cli_parser.py                       |   7 +-
 tests/core/test_configuration.py                   |   4 +-
 tests/executors/test_executor_loader.py            |   7 +-
 tests/sensors/test_base.py                         |  35 +------
 tests/www/views/test_views_tasks.py                |   6 --
 13 files changed, 78 insertions(+), 203 deletions(-)

diff --git a/airflow/cli/commands/remote_commands/task_command.py 
b/airflow/cli/commands/remote_commands/task_command.py
index b2795b7bf9d..60691217dd5 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -405,6 +405,7 @@ class TaskCommandMarker:
 
 
 @cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
 def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
     """
     Run a single task instance.
diff --git a/airflow/config_templates/pre_2_7_defaults.cfg 
b/airflow/config_templates/pre_2_7_defaults.cfg
deleted file mode 100644
index 2568d42445f..00000000000
--- a/airflow/config_templates/pre_2_7_defaults.cfg
+++ /dev/null
@@ -1,105 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# This file contains pre Airflow 2.7, provider defaults for Airflow 
configuration.
-# They are provided as fallback option to older version of the
-# providers that might expect them to be present.
-#
-# NOTE !!!! Please DO NOT modify values in the file even if they change in 
corresponding
-# providers. The values here should be treated as "read only" and should not 
be modified
-# even if defaults in newer versions of corresponding Providers change.
-# They are only here so that backwards compatible behaviour for old provider
-# versions can be maintained.
-
-[atlas]
-sasl_enabled = False
-host =
-port = 21000
-username =
-password =
-
-[hive]
-default_hive_mapred_queue =
-
-[local_kubernetes_executor]
-kubernetes_queue = kubernetes
-
-[celery_kubernetes_executor]
-kubernetes_queue = kubernetes
-
-[celery]
-celery_app_name = airflow.executors.celery_executor
-worker_concurrency = 16
-worker_prefetch_multiplier = 1
-worker_enable_remote_control = true
-broker_url = redis://redis:6379/0
-result_backend_sqlalchemy_engine_options =
-flower_host = 0.0.0.0
-flower_url_prefix =
-flower_port = 5555
-flower_basic_auth =
-sync_parallelism = 0
-celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
-ssl_active = False
-ssl_key =
-ssl_cert =
-ssl_cacert =
-pool = prefork
-operation_timeout = 1.0
-task_track_started = True
-task_publish_max_retries = 3
-worker_precheck = False
-
-[elasticsearch]
-host =
-log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
-end_of_log_mark = end_of_log
-frontend =
-write_stdout = False
-json_format = False
-json_fields = asctime, filename, lineno, levelname, message
-host_field = host
-offset_field = offset
-index_patterns = _all
-
-[elasticsearch_configs]
-use_ssl = False
-verify_certs = True
-
-[kubernetes_executor]
-api_client_retry_configuration =
-logs_task_metadata = False
-pod_template_file =
-worker_container_repository =
-worker_container_tag =
-namespace = default
-delete_worker_pods = True
-delete_worker_pods_on_failure = False
-worker_pods_creation_batch_size = 1
-multi_namespace_mode = False
-multi_namespace_mode_namespace_list =
-in_cluster = True
-kube_client_request_args =
-delete_option_kwargs =
-enable_tcp_keepalive = True
-tcp_keep_idle = 120
-tcp_keep_intvl = 30
-tcp_keep_cnt = 6
-verify_ssl = True
-worker_pods_queued_check_interval = 60
-ssl_ca_cert =
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 76e2690ffe1..47d2361dfaf 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -209,7 +209,6 @@ class AirflowConfigParser(ConfigParser):
         # interpolation placeholders. The _default_values config parser will 
interpolate them
         # properly when we call get() on it.
         self._default_values = 
create_default_config_parser(self.configuration_description)
-        self._pre_2_7_default_values = create_pre_2_7_defaults()
         if default_config is not None:
             self._update_defaults_from_string(default_config)
         self._update_logging_deprecated_template_to_one_from_defaults()
@@ -292,10 +291,6 @@ class AirflowConfigParser(ConfigParser):
             return value.replace("%", "%%")
         return value
 
-    def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> 
Any:
-        """Get pre 2.7 default config values."""
-        return self._pre_2_7_default_values.get(section, key, fallback=None, 
**kwargs)
-
     # These configuration elements can be fetched as the stdout of commands
     # following the "{section}__{name}_cmd" pattern, the idea behind this
     # is to not store password on boxes in text files.
@@ -454,6 +449,8 @@ class AirflowConfigParser(ConfigParser):
         elif not with_providers and self._providers_configuration_loaded:
             reload_providers_when_leaving = True
             self.restore_core_default_configuration()
+        elif self._providers_configuration_loaded:
+            reload_providers_when_leaving = True
         yield
         if reload_providers_when_leaving:
             self.load_providers_configuration()
@@ -989,10 +986,6 @@ class AirflowConfigParser(ConfigParser):
         if self.get_default_value(section, key) is not None or "fallback" in 
kwargs:
             return expand_env_var(self.get_default_value(section, key, 
**kwargs))
 
-        if self.get_default_pre_2_7_value(section, key) is not None:
-            # no expansion needed
-            return self.get_default_pre_2_7_value(section, key, **kwargs)
-
         if not suppress_warnings:
             log.warning("section/key [%s/%s] not found in config", section, 
key)
 
@@ -1382,7 +1375,6 @@ class AirflowConfigParser(ConfigParser):
 
         # We check sequentially all those sources and the last one we saw it 
in will "win"
         configs: Iterable[tuple[str, ConfigParser]] = [
-            ("default-pre-2-7", self._pre_2_7_default_values),
             ("default", self._default_values),
             ("airflow.cfg", self),
         ]
@@ -1901,20 +1893,6 @@ def 
create_default_config_parser(configuration_description: dict[str, dict[str,
     return parser
 
 
-def create_pre_2_7_defaults() -> ConfigParser:
-    """
-    Create parser using the old defaults from Airflow < 2.7.0.
-
-    This is used in order to be able to fall-back to those defaults when old 
version of provider,
-    not supporting "config contribution" is installed with Airflow 2.7.0+. 
This "default"
-    configuration does not support variable expansion, those are pretty much 
hard-coded defaults '
-    we want to fall-back to in such case.
-    """
-    config_parser = ConfigParser()
-    config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
-    return config_parser
-
-
 def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
     airflow_config = pathlib.Path(AIRFLOW_CONFIG)
     if airflow_config.is_dir():
diff --git a/airflow/settings.py b/airflow/settings.py
index afde6d68d7d..e9b560d5f97 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -535,7 +535,7 @@ def validate_session():
     """Validate ORM Session."""
     global engine
 
-    worker_precheck = conf.getboolean("celery", "worker_precheck")
+    worker_precheck = conf.getboolean("celery", "worker_precheck", 
fallback=False)
     if not worker_precheck:
         return True
     else:
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index fe5c4c0a791..3f30ed0a642 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -950,8 +950,12 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
         log.info("Log template table does not exist (added in 2.3.0); skipping 
log template sync.")
         return
 
+    es_log_it_template_fallback = 
"{dag_id}-{task_id}-{execution_date}-{try_number}"
+
     filename = conf.get("logging", "log_filename_template")
-    elasticsearch_id = conf.get("elasticsearch", "log_id_template")
+    elasticsearch_id = conf.get("elasticsearch", "log_id_template", 
fallback=es_log_it_template_fallback)
+
+    # TODO: The elasticsearch specific stuff here is probably inappropriate - 
provider is bleeding into core
 
     stored = session.execute(
         select(
@@ -965,7 +969,9 @@ def synchronize_log_template(*, session: Session = 
NEW_SESSION) -> None:
     # If we have an empty table, and the default values exist, we will seed the
     # table with values from pre 2.3.0, so old logs will still be retrievable.
     if not stored:
-        is_default_log_id = elasticsearch_id == 
conf.get_default_value("elasticsearch", "log_id_template")
+        is_default_log_id = elasticsearch_id == conf.get_default_value(
+            "elasticsearch", "log_id_template", 
fallback=es_log_it_template_fallback
+        )
         is_default_filename = filename == conf.get_default_value("logging", 
"log_filename_template")
         if is_default_log_id and is_default_filename:
             session.add(
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 09866de7214..e1d990d6e82 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -404,14 +404,6 @@ class FileTaskHandler(logging.Handler):
         out_message = logs if "log_pos" in (metadata or {}) else messages + 
logs
         return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
 
-    @staticmethod
-    def _get_pod_namespace(ti: TaskInstance):
-        pod_override = ti.executor_config.get("pod_override")
-        namespace = None
-        with suppress(Exception):
-            namespace = pod_override.metadata.namespace
-        return namespace or conf.get("kubernetes_executor", "namespace")
-
     def _get_log_retrieval_url(
         self, ti: TaskInstance, log_relative_path: str, log_type: LogType | 
None = None
     ) -> tuple[str, str]:
diff --git a/newsfragments/44755.significant.rst 
b/newsfragments/44755.significant.rst
new file mode 100644
index 00000000000..38fe6e8bf63
--- /dev/null
+++ b/newsfragments/44755.significant.rst
@@ -0,0 +1,26 @@
+Provider configuration fallbacks removed from core
+
+Around ~2.7, we moved provider config from core into the providers themselves.
+However, if providers that were released before that change was used on 2.7+,
+there could be failures because the config wouldn't be in those providers!
+So, we added a fallback for all of the configs that were moved. These fallbacks
+have now been removed. You must use providers that contain their own config.
+
+TODO: It's unlikely this will be the change that sets the "lowest" min 
provider version
+for compatibility with Airflow 3. But, it is possible, and if it is we will 
need to
+then determine what those versions are.
+
+Note: This changes doesn't really fix in any of the following "types", but 
I've marked
+dependency change so there is something checked.
+
+.. Check the type of change that applies to this change
+
+* Types of change
+
+  * [ ] DAG changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [ ] Behaviour changes
+  * [ ] Plugin changes
+  * [x] Dependency change
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 17f891c94f8..f8ee6b930f6 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -23,29 +23,33 @@ import pytest
 
 from airflow.executors import local_executor
 from airflow.models.dagbag import DagBag
-from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
-from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, 
local_kubernetes_executor
 
 from tests_common.test_utils.config import conf_vars
 
-# Create custom executors here because conftest is imported first
-custom_executor_module = type(sys)("custom_executor")
-custom_executor_module.CustomCeleryExecutor = type(  # type:  ignore
-    "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
-)
-custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: ignore
-    "CustomCeleryKubernetesExecutor", 
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
-)
-custom_executor_module.CustomLocalExecutor = type(  # type:  ignore
-    "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
-)
-custom_executor_module.CustomLocalKubernetesExecutor = type(  # type: ignore
-    "CustomLocalKubernetesExecutor", 
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
-)
-custom_executor_module.CustomKubernetesExecutor = type(  # type:  ignore
-    "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
-)
-sys.modules["custom_executor"] = custom_executor_module
+
[email protected]
+def custom_executors():
+    from airflow.providers.celery.executors import celery_executor, 
celery_kubernetes_executor
+    from airflow.providers.cncf.kubernetes.executors import 
kubernetes_executor, local_kubernetes_executor
+
+    # Create custom executors here because conftest is imported first
+    custom_executor_module = type(sys)("custom_executor")
+    custom_executor_module.CustomCeleryExecutor = type(  # type:  ignore
+        "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
+    )
+    custom_executor_module.CustomCeleryKubernetesExecutor = type(  # type: 
ignore
+        "CustomCeleryKubernetesExecutor", 
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
+    )
+    custom_executor_module.CustomLocalExecutor = type(  # type:  ignore
+        "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
+    )
+    custom_executor_module.CustomLocalKubernetesExecutor = type(  # type: 
ignore
+        "CustomLocalKubernetesExecutor", 
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
+    )
+    custom_executor_module.CustomKubernetesExecutor = type(  # type:  ignore
+        "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), 
{}
+    )
+    sys.modules["custom_executor"] = custom_executor_module
 
 
 @pytest.fixture(autouse=True)
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 23363b379d8..3ea3473c3f8 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -42,7 +42,6 @@ from airflow.executors import executor_loader
 from airflow.executors.executor_utils import ExecutorName
 from airflow.executors.local_executor import LocalExecutor
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
-from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 
 from tests_common.test_utils.config import conf_vars
 
@@ -167,7 +166,7 @@ class TestCli:
             # force re-evaluation of cli commands (done in top level code)
             reload(cli_parser)
 
-    @patch.object(CeleryExecutor, "get_cli_commands")
+    
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
     @patch.object(AwsEcsExecutor, "get_cli_commands")
     def test_hybrid_executor_get_cli_commands(
         self, ecs_executor_cli_commands_mock, celery_executor_cli_commands_mock
@@ -201,7 +200,7 @@ class TestCli:
         assert celery_executor_command.name in commands
         assert ecs_executor_command.name in commands
 
-    @patch.object(CeleryExecutor, "get_cli_commands")
+    
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
     @patch.object(AwsEcsExecutor, "get_cli_commands")
     def test_hybrid_executor_get_cli_commands_with_error(
         self, ecs_executor_cli_commands_mock, 
celery_executor_cli_commands_mock, caplog
@@ -378,7 +377,7 @@ class TestCli:
             ("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
         ],
     )
-    def test_cli_parser_executors(self, executor, expected_args):
+    def test_cli_parser_executors(self, custom_executors, executor, 
expected_args):
         """Test that CLI commands for the configured executor are present"""
         for expected_arg in expected_args:
             with (
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index b19d6ff0cf2..efc88d476ea 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -1594,8 +1594,8 @@ def test_restore_and_reload_provider_configuration():
     assert conf.get("celery", "celery_app_name") == 
"airflow.providers.celery.executors.celery_executor"
     conf.restore_core_default_configuration()
     assert conf.providers_configuration_loaded is False
-    # built-in pre-2-7 celery executor
-    assert conf.get("celery", "celery_app_name") == 
"airflow.executors.celery_executor"
+    with pytest.raises(AirflowConfigException, match="not found in config"):
+        conf.get("celery", "celery_app_name")
     conf.load_providers_configuration()
     assert conf.providers_configuration_loaded is True
     assert conf.get("celery", "celery_app_name") == 
"airflow.providers.celery.executors.celery_executor"
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index 48c531c5cb9..d783952b66c 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -26,7 +26,7 @@ from airflow.executors import executor_loader
 from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, 
ExecutorName
 from airflow.executors.local_executor import LocalExecutor
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
-from airflow.providers.celery.executors.celery_executor import CeleryExecutor
+from airflow.providers_manager import ProvidersManager
 
 from tests_common.test_utils.config import conf_vars
 
@@ -141,6 +141,11 @@ class TestExecutorLoader:
             assert executors == expected_executors_list
 
     def test_init_executors(self):
+        # We need to init provider config in order to import CeleryExecutor
+        ProvidersManager().initialize_providers_configuration()
+
+        from airflow.providers.celery.executors.celery_executor import 
CeleryExecutor
+
         with conf_vars({("core", "executor"): "CeleryExecutor"}):
             executors = ExecutorLoader.init_executors()
             executor_name = ExecutorLoader.get_default_executor_name()
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 9bb4f5b9934..53096576384 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -33,25 +33,11 @@ from airflow.exceptions import (
     AirflowTaskTimeout,
 )
 from airflow.executors.debug_executor import DebugExecutor
-from airflow.executors.executor_constants import (
-    CELERY_EXECUTOR,
-    CELERY_KUBERNETES_EXECUTOR,
-    DEBUG_EXECUTOR,
-    KUBERNETES_EXECUTOR,
-    LOCAL_EXECUTOR,
-    LOCAL_KUBERNETES_EXECUTOR,
-    SEQUENTIAL_EXECUTOR,
-)
 from airflow.executors.local_executor import LocalExecutor
-from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import TaskInstance, TaskReschedule
 from airflow.models.trigger import TriggerFailureReason
 from airflow.models.xcom import XCom
 from airflow.operators.empty import EmptyOperator
-from airflow.providers.celery.executors.celery_executor import CeleryExecutor
-from airflow.providers.celery.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import 
KubernetesExecutor
-from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor 
import LocalKubernetesExecutor
 from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, 
poke_mode_only
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.utils import timezone
@@ -1022,31 +1008,20 @@ class TestBaseSensor:
         assert actual_xcom_value is None
 
     @pytest.mark.parametrize(
-        "executor_cls_mode",
+        "executor_cls, expected_mode",
         [
-            (CeleryExecutor, "poke"),
-            (CeleryKubernetesExecutor, "poke"),
             (DebugExecutor, "reschedule"),
-            (KubernetesExecutor, "poke"),
             (LocalExecutor, "poke"),
-            (LocalKubernetesExecutor, "poke"),
-            (SequentialExecutor, "poke"),
         ],
         ids=[
-            CELERY_EXECUTOR,
-            CELERY_KUBERNETES_EXECUTOR,
-            DEBUG_EXECUTOR,
-            KUBERNETES_EXECUTOR,
-            LOCAL_EXECUTOR,
-            LOCAL_KUBERNETES_EXECUTOR,
-            SEQUENTIAL_EXECUTOR,
+            "debug_executor_reschedule",
+            "local_executor_poke",
         ],
     )
-    def test_prepare_for_execution(self, executor_cls_mode):
+    def test_change_sensor_mode_to_reschedule(self, executor_cls, 
expected_mode):
         """
         Should change mode of the task to reschedule if using DEBUG_EXECUTOR
         """
-        executor_cls, mode = executor_cls_mode
         sensor = DummySensor(
             task_id=SENSOR_OP,
             return_value=None,
@@ -1060,7 +1035,7 @@ class TestBaseSensor:
         ) as load_executor:
             load_executor.return_value = (executor_cls, None)
             task = sensor.prepare_for_execution()
-            assert task.mode == mode
+            assert task.mode == expected_mode
 
     def test_resume_execution(self):
         op = BaseSensorOperator(task_id="hi")
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index 11c155d3d60..4c05c46152b 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -35,7 +35,6 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.xcom import XCom
 from airflow.operators.empty import EmptyOperator
-from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -610,11 +609,6 @@ def test_dag_never_run(admin_client, url):
     check_content_in_response(f"Cannot mark tasks as {url}, seem that DAG 
{dag_id} has never run", resp)
 
 
-class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
-    def heartbeat(self):
-        return True
-
-
 def test_delete_dag_button_for_dag_on_scheduler_only(admin_client, dag_maker):
     with dag_maker() as dag:
         EmptyOperator(task_id="task")

Reply via email to