This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cc9b9a8794d Revert "Remove fallback for old, pre 2.7, providers
(#44755)" (#44886)
cc9b9a8794d is described below
commit cc9b9a8794da50630e26f5de04b4c0ae5d49353f
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Dec 12 18:59:55 2024 +0100
Revert "Remove fallback for old, pre 2.7, providers (#44755)" (#44886)
This reverts commit 02821cc369205b7df5553b79dc19969f63ba0a95.
---
.../cli/commands/remote_commands/task_command.py | 1 -
airflow/config_templates/pre_2_7_defaults.cfg | 105 +++++++++++++++++++++
airflow/configuration.py | 26 ++++-
airflow/settings.py | 2 +-
airflow/utils/db.py | 10 +-
airflow/utils/log/file_task_handler.py | 8 ++
newsfragments/44755.significant.rst | 26 -----
tests/cli/conftest.py | 44 ++++-----
tests/cli/test_cli_parser.py | 7 +-
tests/core/test_configuration.py | 4 +-
tests/executors/test_executor_loader.py | 7 +-
tests/sensors/test_base.py | 35 ++++++-
tests/www/views/test_views_tasks.py | 6 ++
13 files changed, 203 insertions(+), 78 deletions(-)
diff --git a/airflow/cli/commands/remote_commands/task_command.py
b/airflow/cli/commands/remote_commands/task_command.py
index 60691217dd5..b2795b7bf9d 100644
--- a/airflow/cli/commands/remote_commands/task_command.py
+++ b/airflow/cli/commands/remote_commands/task_command.py
@@ -405,7 +405,6 @@ class TaskCommandMarker:
@cli_utils.action_cli(check_db=False)
-@providers_configuration_loaded
def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
"""
Run a single task instance.
diff --git a/airflow/config_templates/pre_2_7_defaults.cfg
b/airflow/config_templates/pre_2_7_defaults.cfg
new file mode 100644
index 00000000000..2568d42445f
--- /dev/null
+++ b/airflow/config_templates/pre_2_7_defaults.cfg
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file contains pre Airflow 2.7, provider defaults for Airflow
configuration.
+# They are provided as fallback option to older version of the
+# providers that might expect them to be present.
+#
+# NOTE !!!! Please DO NOT modify values in the file even if they change in
corresponding
+# providers. The values here should be treated as "read only" and should not
be modified
+# even if defaults in newer versions of corresponding Providers change.
+# They are only here so that backwards compatible behaviour for old provider
+# versions can be maintained.
+
+[atlas]
+sasl_enabled = False
+host =
+port = 21000
+username =
+password =
+
+[hive]
+default_hive_mapred_queue =
+
+[local_kubernetes_executor]
+kubernetes_queue = kubernetes
+
+[celery_kubernetes_executor]
+kubernetes_queue = kubernetes
+
+[celery]
+celery_app_name = airflow.executors.celery_executor
+worker_concurrency = 16
+worker_prefetch_multiplier = 1
+worker_enable_remote_control = true
+broker_url = redis://redis:6379/0
+result_backend_sqlalchemy_engine_options =
+flower_host = 0.0.0.0
+flower_url_prefix =
+flower_port = 5555
+flower_basic_auth =
+sync_parallelism = 0
+celery_config_options =
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
+ssl_active = False
+ssl_key =
+ssl_cert =
+ssl_cacert =
+pool = prefork
+operation_timeout = 1.0
+task_track_started = True
+task_publish_max_retries = 3
+worker_precheck = False
+
+[elasticsearch]
+host =
+log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+end_of_log_mark = end_of_log
+frontend =
+write_stdout = False
+json_format = False
+json_fields = asctime, filename, lineno, levelname, message
+host_field = host
+offset_field = offset
+index_patterns = _all
+
+[elasticsearch_configs]
+use_ssl = False
+verify_certs = True
+
+[kubernetes_executor]
+api_client_retry_configuration =
+logs_task_metadata = False
+pod_template_file =
+worker_container_repository =
+worker_container_tag =
+namespace = default
+delete_worker_pods = True
+delete_worker_pods_on_failure = False
+worker_pods_creation_batch_size = 1
+multi_namespace_mode = False
+multi_namespace_mode_namespace_list =
+in_cluster = True
+kube_client_request_args =
+delete_option_kwargs =
+enable_tcp_keepalive = True
+tcp_keep_idle = 120
+tcp_keep_intvl = 30
+tcp_keep_cnt = 6
+verify_ssl = True
+worker_pods_queued_check_interval = 60
+ssl_ca_cert =
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 47d2361dfaf..76e2690ffe1 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -209,6 +209,7 @@ class AirflowConfigParser(ConfigParser):
# interpolation placeholders. The _default_values config parser will
interpolate them
# properly when we call get() on it.
self._default_values =
create_default_config_parser(self.configuration_description)
+ self._pre_2_7_default_values = create_pre_2_7_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
@@ -291,6 +292,10 @@ class AirflowConfigParser(ConfigParser):
return value.replace("%", "%%")
return value
+ def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) ->
Any:
+ """Get pre 2.7 default config values."""
+ return self._pre_2_7_default_values.get(section, key, fallback=None,
**kwargs)
+
# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
@@ -449,8 +454,6 @@ class AirflowConfigParser(ConfigParser):
elif not with_providers and self._providers_configuration_loaded:
reload_providers_when_leaving = True
self.restore_core_default_configuration()
- elif self._providers_configuration_loaded:
- reload_providers_when_leaving = True
yield
if reload_providers_when_leaving:
self.load_providers_configuration()
@@ -986,6 +989,10 @@ class AirflowConfigParser(ConfigParser):
if self.get_default_value(section, key) is not None or "fallback" in
kwargs:
return expand_env_var(self.get_default_value(section, key,
**kwargs))
+ if self.get_default_pre_2_7_value(section, key) is not None:
+ # no expansion needed
+ return self.get_default_pre_2_7_value(section, key, **kwargs)
+
if not suppress_warnings:
log.warning("section/key [%s/%s] not found in config", section,
key)
@@ -1375,6 +1382,7 @@ class AirflowConfigParser(ConfigParser):
# We check sequentially all those sources and the last one we saw it
in will "win"
configs: Iterable[tuple[str, ConfigParser]] = [
+ ("default-pre-2-7", self._pre_2_7_default_values),
("default", self._default_values),
("airflow.cfg", self),
]
@@ -1893,6 +1901,20 @@ def
create_default_config_parser(configuration_description: dict[str, dict[str,
return parser
+def create_pre_2_7_defaults() -> ConfigParser:
+ """
+ Create parser using the old defaults from Airflow < 2.7.0.
+
+ This is used in order to be able to fall-back to those defaults when old
version of provider,
+ not supporting "config contribution" is installed with Airflow 2.7.0+.
This "default"
+ configuration does not support variable expansion, those are pretty much
hard-coded defaults '
+ we want to fall-back to in such case.
+ """
+ config_parser = ConfigParser()
+ config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
+ return config_parser
+
+
def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
airflow_config = pathlib.Path(AIRFLOW_CONFIG)
if airflow_config.is_dir():
diff --git a/airflow/settings.py b/airflow/settings.py
index e9b560d5f97..afde6d68d7d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -535,7 +535,7 @@ def validate_session():
"""Validate ORM Session."""
global engine
- worker_precheck = conf.getboolean("celery", "worker_precheck",
fallback=False)
+ worker_precheck = conf.getboolean("celery", "worker_precheck")
if not worker_precheck:
return True
else:
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 3f30ed0a642..fe5c4c0a791 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -950,12 +950,8 @@ def synchronize_log_template(*, session: Session =
NEW_SESSION) -> None:
log.info("Log template table does not exist (added in 2.3.0); skipping
log template sync.")
return
- es_log_it_template_fallback =
"{dag_id}-{task_id}-{execution_date}-{try_number}"
-
filename = conf.get("logging", "log_filename_template")
- elasticsearch_id = conf.get("elasticsearch", "log_id_template",
fallback=es_log_it_template_fallback)
-
- # TODO: The elasticsearch specific stuff here is probably inappropriate -
provider is bleeding into core
+ elasticsearch_id = conf.get("elasticsearch", "log_id_template")
stored = session.execute(
select(
@@ -969,9 +965,7 @@ def synchronize_log_template(*, session: Session =
NEW_SESSION) -> None:
# If we have an empty table, and the default values exist, we will seed the
# table with values from pre 2.3.0, so old logs will still be retrievable.
if not stored:
- is_default_log_id = elasticsearch_id == conf.get_default_value(
- "elasticsearch", "log_id_template",
fallback=es_log_it_template_fallback
- )
+ is_default_log_id = elasticsearch_id ==
conf.get_default_value("elasticsearch", "log_id_template")
is_default_filename = filename == conf.get_default_value("logging",
"log_filename_template")
if is_default_log_id and is_default_filename:
session.add(
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index e1d990d6e82..09866de7214 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -404,6 +404,14 @@ class FileTaskHandler(logging.Handler):
out_message = logs if "log_pos" in (metadata or {}) else messages +
logs
return out_message, {"end_of_log": end_of_log, "log_pos": log_pos}
+ @staticmethod
+ def _get_pod_namespace(ti: TaskInstance):
+ pod_override = ti.executor_config.get("pod_override")
+ namespace = None
+ with suppress(Exception):
+ namespace = pod_override.metadata.namespace
+ return namespace or conf.get("kubernetes_executor", "namespace")
+
def _get_log_retrieval_url(
self, ti: TaskInstance, log_relative_path: str, log_type: LogType |
None = None
) -> tuple[str, str]:
diff --git a/newsfragments/44755.significant.rst
b/newsfragments/44755.significant.rst
deleted file mode 100644
index 38fe6e8bf63..00000000000
--- a/newsfragments/44755.significant.rst
+++ /dev/null
@@ -1,26 +0,0 @@
-Provider configuration fallbacks removed from core
-
-Around ~2.7, we moved provider config from core into the providers themselves.
-However, if providers that were released before that change was used on 2.7+,
-there could be failures because the config wouldn't be in those providers!
-So, we added a fallback for all of the configs that were moved. These fallbacks
-have now been removed. You must use providers that contain their own config.
-
-TODO: It's unlikely this will be the change that sets the "lowest" min
provider version
-for compatibility with Airflow 3. But, it is possible, and if it is we will
need to
-then determine what those versions are.
-
-Note: This changes doesn't really fix in any of the following "types", but
I've marked
-dependency change so there is something checked.
-
-.. Check the type of change that applies to this change
-
-* Types of change
-
- * [ ] DAG changes
- * [ ] Config changes
- * [ ] API changes
- * [ ] CLI changes
- * [ ] Behaviour changes
- * [ ] Plugin changes
- * [x] Dependency change
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index f8ee6b930f6..17f891c94f8 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -23,33 +23,29 @@ import pytest
from airflow.executors import local_executor
from airflow.models.dagbag import DagBag
+from airflow.providers.celery.executors import celery_executor,
celery_kubernetes_executor
+from airflow.providers.cncf.kubernetes.executors import kubernetes_executor,
local_kubernetes_executor
from tests_common.test_utils.config import conf_vars
-
[email protected]
-def custom_executors():
- from airflow.providers.celery.executors import celery_executor,
celery_kubernetes_executor
- from airflow.providers.cncf.kubernetes.executors import
kubernetes_executor, local_kubernetes_executor
-
- # Create custom executors here because conftest is imported first
- custom_executor_module = type(sys)("custom_executor")
- custom_executor_module.CustomCeleryExecutor = type( # type: ignore
- "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
- )
- custom_executor_module.CustomCeleryKubernetesExecutor = type( # type:
ignore
- "CustomCeleryKubernetesExecutor",
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
- )
- custom_executor_module.CustomLocalExecutor = type( # type: ignore
- "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
- )
- custom_executor_module.CustomLocalKubernetesExecutor = type( # type:
ignore
- "CustomLocalKubernetesExecutor",
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
- )
- custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
- "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,),
{}
- )
- sys.modules["custom_executor"] = custom_executor_module
+# Create custom executors here because conftest is imported first
+custom_executor_module = type(sys)("custom_executor")
+custom_executor_module.CustomCeleryExecutor = type( # type: ignore
+ "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
+)
+custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
+ "CustomCeleryKubernetesExecutor",
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
+)
+custom_executor_module.CustomLocalExecutor = type( # type: ignore
+ "CustomLocalExecutor", (local_executor.LocalExecutor,), {}
+)
+custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
+ "CustomLocalKubernetesExecutor",
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
+)
+custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
+ "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
+)
+sys.modules["custom_executor"] = custom_executor_module
@pytest.fixture(autouse=True)
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 3ea3473c3f8..23363b379d8 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -42,6 +42,7 @@ from airflow.executors import executor_loader
from airflow.executors.executor_utils import ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import
AwsEcsExecutor
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from tests_common.test_utils.config import conf_vars
@@ -166,7 +167,7 @@ class TestCli:
# force re-evaluation of cli commands (done in top level code)
reload(cli_parser)
-
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
+ @patch.object(CeleryExecutor, "get_cli_commands")
@patch.object(AwsEcsExecutor, "get_cli_commands")
def test_hybrid_executor_get_cli_commands(
self, ecs_executor_cli_commands_mock, celery_executor_cli_commands_mock
@@ -200,7 +201,7 @@ class TestCli:
assert celery_executor_command.name in commands
assert ecs_executor_command.name in commands
-
@patch("airflow.providers.celery.executors.celery_executor.CeleryExecutor.get_cli_commands")
+ @patch.object(CeleryExecutor, "get_cli_commands")
@patch.object(AwsEcsExecutor, "get_cli_commands")
def test_hybrid_executor_get_cli_commands_with_error(
self, ecs_executor_cli_commands_mock,
celery_executor_cli_commands_mock, caplog
@@ -377,7 +378,7 @@ class TestCli:
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
],
)
- def test_cli_parser_executors(self, custom_executors, executor,
expected_args):
+ def test_cli_parser_executors(self, executor, expected_args):
"""Test that CLI commands for the configured executor are present"""
for expected_arg in expected_args:
with (
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index efc88d476ea..b19d6ff0cf2 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -1594,8 +1594,8 @@ def test_restore_and_reload_provider_configuration():
assert conf.get("celery", "celery_app_name") ==
"airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
assert conf.providers_configuration_loaded is False
- with pytest.raises(AirflowConfigException, match="not found in config"):
- conf.get("celery", "celery_app_name")
+ # built-in pre-2-7 celery executor
+ assert conf.get("celery", "celery_app_name") ==
"airflow.executors.celery_executor"
conf.load_providers_configuration()
assert conf.providers_configuration_loaded is True
assert conf.get("celery", "celery_app_name") ==
"airflow.providers.celery.executors.celery_executor"
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index d783952b66c..48c531c5cb9 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -26,7 +26,7 @@ from airflow.executors import executor_loader
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader,
ExecutorName
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import
AwsEcsExecutor
-from airflow.providers_manager import ProvidersManager
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from tests_common.test_utils.config import conf_vars
@@ -141,11 +141,6 @@ class TestExecutorLoader:
assert executors == expected_executors_list
def test_init_executors(self):
- # We need to init provider config in order to import CeleryExecutor
- ProvidersManager().initialize_providers_configuration()
-
- from airflow.providers.celery.executors.celery_executor import
CeleryExecutor
-
with conf_vars({("core", "executor"): "CeleryExecutor"}):
executors = ExecutorLoader.init_executors()
executor_name = ExecutorLoader.get_default_executor_name()
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 53096576384..9bb4f5b9934 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -33,11 +33,25 @@ from airflow.exceptions import (
AirflowTaskTimeout,
)
from airflow.executors.debug_executor import DebugExecutor
+from airflow.executors.executor_constants import (
+ CELERY_EXECUTOR,
+ CELERY_KUBERNETES_EXECUTOR,
+ DEBUG_EXECUTOR,
+ KUBERNETES_EXECUTOR,
+ LOCAL_EXECUTOR,
+ LOCAL_KUBERNETES_EXECUTOR,
+ SEQUENTIAL_EXECUTOR,
+)
from airflow.executors.local_executor import LocalExecutor
+from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models import TaskInstance, TaskReschedule
from airflow.models.trigger import TriggerFailureReason
from airflow.models.xcom import XCom
from airflow.operators.empty import EmptyOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
+from airflow.providers.celery.executors.celery_kubernetes_executor import
CeleryKubernetesExecutor
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
KubernetesExecutor
+from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor
import LocalKubernetesExecutor
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue,
poke_mode_only
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils import timezone
@@ -1008,20 +1022,31 @@ class TestBaseSensor:
assert actual_xcom_value is None
@pytest.mark.parametrize(
- "executor_cls, expected_mode",
+ "executor_cls_mode",
[
+ (CeleryExecutor, "poke"),
+ (CeleryKubernetesExecutor, "poke"),
(DebugExecutor, "reschedule"),
+ (KubernetesExecutor, "poke"),
(LocalExecutor, "poke"),
+ (LocalKubernetesExecutor, "poke"),
+ (SequentialExecutor, "poke"),
],
ids=[
- "debug_executor_reschedule",
- "local_executor_poke",
+ CELERY_EXECUTOR,
+ CELERY_KUBERNETES_EXECUTOR,
+ DEBUG_EXECUTOR,
+ KUBERNETES_EXECUTOR,
+ LOCAL_EXECUTOR,
+ LOCAL_KUBERNETES_EXECUTOR,
+ SEQUENTIAL_EXECUTOR,
],
)
- def test_change_sensor_mode_to_reschedule(self, executor_cls,
expected_mode):
+ def test_prepare_for_execution(self, executor_cls_mode):
"""
Should change mode of the task to reschedule if using DEBUG_EXECUTOR
"""
+ executor_cls, mode = executor_cls_mode
sensor = DummySensor(
task_id=SENSOR_OP,
return_value=None,
@@ -1035,7 +1060,7 @@ class TestBaseSensor:
) as load_executor:
load_executor.return_value = (executor_cls, None)
task = sensor.prepare_for_execution()
- assert task.mode == expected_mode
+ assert task.mode == mode
def test_resume_execution(self):
op = BaseSensorOperator(task_id="hi")
diff --git a/tests/www/views/test_views_tasks.py
b/tests/www/views/test_views_tasks.py
index 4c05c46152b..11c155d3d60 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -35,6 +35,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.xcom import XCom
from airflow.operators.empty import EmptyOperator
+from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -609,6 +610,11 @@ def test_dag_never_run(admin_client, url):
check_content_in_response(f"Cannot mark tasks as {url}, seem that DAG
{dag_id} has never run", resp)
+class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
+ def heartbeat(self):
+ return True
+
+
def test_delete_dag_button_for_dag_on_scheduler_only(admin_client, dag_maker):
with dag_maker() as dag:
EmptyOperator(task_id="task")