This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new a0062c8a509 Backport `airflow config update` and `airflow config lint`
changes (#50353)
a0062c8a509 is described below
commit a0062c8a5092c2c2e6587a5d6b0bbfd75e12b58f
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Fri May 9 14:47:31 2025 +0545
Backport `airflow config update` and `airflow config lint` changes (#50353)
* Fix Legacy configuration not removed or updated (#46544)
* Fix Legacy configuration not removed or updated
(cherry picked from commit 971973725bd368297fda8dbe096ed6b199440ad0)
* allow_trigger_in_future config removal (#46667)
(cherry picked from commit 65bacad3199d302ba5d9ddb8486f7029e6b171f1)
* Backport airflow config update and airflow config lint changes
* Rename pre-2-7 defaults to provider_config_fallback_defaults (#44895)
* Rename pre-2-7 defaults to provider_config_fallback_defaults
After unsuccessful atempts to remove them, renaming and adding
appropriate warning against removing them seems to be a good idea.
* Update airflow/config_templates/provider_config_fallback_defaults.cfg
Co-authored-by: Jed Cunningham
<[email protected]>
---------
Co-authored-by: Jed Cunningham
<[email protected]>
(cherry picked from commit 8c013b6a89da81832270841dea529f86cbe42c6f)
* Fix `lookup_from_deprecated_options` in AirflowConfigParser (#47004)
* Fix deprecated_options in AirflowConfigParser
* Add test_deprecated_options_with_lookup_from_deprecated, rename as
lookup_from_deprecated
* fixup! Add docstring for lookup_from_deprecated
* Fix fixture params naming
(cherry picked from commit 8e3d25f909756b7438092ace915293b443026d37)
* Fix the tests
---------
Co-authored-by: LIU ZHE YOU <[email protected]>
Co-authored-by: Phani Kumar <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/cli/cli_config.py | 49 +-
airflow/cli/commands/config_command.py | 546 ++++++++++++++++++++-
...s.cfg => provider_config_fallback_defaults.cfg} | 45 +-
airflow/configuration.py | 241 ++++++---
tests/cli/commands/test_config_command.py | 148 +++++-
tests/core/test_configuration.py | 42 ++
6 files changed, 986 insertions(+), 85 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index ce753e2f4bb..ccc81ffc77c 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -973,6 +973,39 @@ ARG_LINT_CONFIG_IGNORE_OPTION = Arg(
type=string_list_type,
)
+# config update
+ARG_UPDATE_CONFIG_SECTION = Arg(
+ ("--section",),
+ help="The section name(s) to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_OPTION = Arg(
+ ("--option",),
+ help="The option name(s) to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_SECTION = Arg(
+ ("--ignore-section",),
+ help="The section name(s) to ignore to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_OPTION = Arg(
+ ("--ignore-option",),
+ help="The option name(s) to ignore to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_FIX = Arg(
+ ("--fix",),
+ help="Automatically apply the configuration changes instead of performing
a dry run. (Default: dry-run mode)",
+ action="store_true",
+)
+
+ARG_UPDATE_ALL_RECOMMENDATIONS = Arg(
+ ("--all-recommendations",),
+ help="Include non-breaking (recommended) changes along with breaking ones.
(Also use with --fix)",
+ action="store_true",
+)
+
# kubernetes cleanup-pods
ARG_NAMESPACE = Arg(
("--namespace",),
@@ -1894,7 +1927,7 @@ CONFIG_COMMANDS = (
ActionCommand(
name="lint",
help="lint options for the configuration changes while migrating from
Airflow 2.x to Airflow 3.0",
-
func=lazy_load_command("airflow.cli.commands.remote_commands.config_command.lint_config"),
+
func=lazy_load_command("airflow.cli.commands.config_command.lint_config"),
args=(
ARG_LINT_CONFIG_SECTION,
ARG_LINT_CONFIG_OPTION,
@@ -1903,6 +1936,20 @@ CONFIG_COMMANDS = (
ARG_VERBOSE,
),
),
+ ActionCommand(
+ name="update",
+ help="update options for the configuration changes while migrating
from Airflow 2.x to Airflow 3.0",
+
func=lazy_load_command("airflow.cli.commands.config_command.update_config"),
+ args=(
+ ARG_UPDATE_CONFIG_SECTION,
+ ARG_UPDATE_CONFIG_OPTION,
+ ARG_UPDATE_CONFIG_IGNORE_SECTION,
+ ARG_UPDATE_CONFIG_IGNORE_OPTION,
+ ARG_VERBOSE,
+ ARG_UPDATE_CONFIG_FIX,
+ ARG_UPDATE_ALL_RECOMMENDATIONS,
+ ),
+ ),
)
KUBERNETES_COMMANDS = (
diff --git a/airflow/cli/commands/config_command.py
b/airflow/cli/commands/config_command.py
index 36be0cd2e06..0f5c5df5ba1 100644
--- a/airflow/cli/commands/config_command.py
+++ b/airflow/cli/commands/config_command.py
@@ -18,15 +18,16 @@
from __future__ import annotations
+import shutil
from dataclasses import dataclass
from io import StringIO
-from typing import NamedTuple
+from typing import Any, NamedTuple
import pygments
from pygments.lexers.configs import IniLexer
from airflow.cli.simple_table import AirflowConsole
-from airflow.configuration import conf
+from airflow.configuration import AIRFLOW_CONFIG, ConfigModifications, conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.cli import should_use_colors
from airflow.utils.code_utils import get_terminal_formatter
@@ -82,19 +83,41 @@ class ConfigChange:
Class representing the configuration changes in Airflow 3.0.
:param config: The configuration parameter being changed.
+ :param default_change: If the change is a default value change.
+ :param old_default: The old default value (valid only if default_change is
True).
+ :param new_default: The new default value for the configuration parameter.
:param suggestion: A suggestion for replacing or handling the removed
configuration.
:param renamed_to: The new section and option if the configuration is
renamed.
:param was_deprecated: If the config is removed, whether the old config
was deprecated.
+ :param was_removed: If the config is removed.
+ :param is_invalid_if: If the current config value is invalid in the future.
+ :param breaking: Mark if this change is known to be breaking and causing
errors/ warnings / deprecations.
+ :param remove_if_equals: For removal rules, remove the option only if its
current value equals this value.
"""
config: ConfigParameter
+ default_change: bool = False
+ old_default: str | bool | int | float | None = None
+ new_default: str | bool | int | float | None = None
suggestion: str = ""
renamed_to: ConfigParameter | None = None
was_deprecated: bool = True
+ was_removed: bool = True
+ is_invalid_if: Any = None
+ breaking: bool = False
+ remove_if_equals: str | bool | int | float | None = None
@property
- def message(self) -> str:
+ def message(self) -> str | None:
"""Generate a message for this configuration change."""
+ if self.default_change:
+ value = conf.get(self.config.section, self.config.option)
+ if value != self.new_default:
+ return (
+ f"Changed default value of `{self.config.option}` in
`{self.config.section}` "
+ f"from `{self.old_default}` to `{self.new_default}`. "
+ f"You currently have `{value}` set. {self.suggestion}"
+ )
if self.renamed_to:
if self.config.section != self.renamed_to.section:
return (
@@ -105,11 +128,20 @@ class ConfigChange:
f"`{self.config.option}` configuration parameter renamed to
`{self.renamed_to.option}` "
f"in the `{self.config.section}` section."
)
- return (
- f"Removed{' deprecated' if self.was_deprecated else ''}
`{self.config.option}` configuration parameter "
- f"from `{self.config.section}` section. "
- f"{self.suggestion}"
- )
+ if self.was_removed and not self.remove_if_equals:
+ return (
+ f"Removed{' deprecated' if self.was_deprecated else ''}
`{self.config.option}` configuration parameter "
+ f"from `{self.config.section}` section. "
+ f"{self.suggestion}"
+ )
+ if self.is_invalid_if is not None:
+ value = conf.get(self.config.section, self.config.option)
+ if value == self.is_invalid_if:
+ return (
+ f"Invalid value `{self.is_invalid_if}` set for
`{self.config.option}` configuration parameter "
+ f"in `{self.config.section}` section. {self.suggestion}"
+ )
+ return None
CONFIGS_CHANGES = [
@@ -123,6 +155,19 @@ CONFIGS_CHANGES = [
renamed_to=ConfigParameter("core", "sensitive_var_conn_names"),
),
# core
+ ConfigChange(
+ config=ConfigParameter("core", "executor"),
+ default_change=True,
+ old_default="SequentialExecutor",
+ new_default="LocalExecutor",
+ was_removed=False,
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("core", "hostname"),
+ was_removed=True,
+ remove_if_equals=":",
+ ),
ConfigChange(
config=ConfigParameter("core", "check_slas"),
suggestion="The SLA feature is removed in Airflow 3.0, to be replaced
with Airflow Alerts in future",
@@ -132,6 +177,14 @@ CONFIGS_CHANGES = [
suggestion="Dataset URI with a defined scheme will now always be
validated strictly, "
"raising a hard error on validation failure.",
),
+ ConfigChange(
+ config=ConfigParameter("core", "dag_default_view"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("core", "dag_orientation"),
+ was_deprecated=False,
+ ),
ConfigChange(
config=ConfigParameter("core", "dataset_manager_class"),
renamed_to=ConfigParameter("core", "asset_manager_class"),
@@ -212,6 +265,12 @@ CONFIGS_CHANGES = [
ConfigChange(
config=ConfigParameter("core", "log_processor_filename_template"),
),
+ ConfigChange(
+ config=ConfigParameter("core", "parallelism"),
+ was_removed=False,
+ is_invalid_if="0",
+ suggestion="Please set the `parallelism` configuration parameter to a
value greater than 0.",
+ ),
# api
ConfigChange(
config=ConfigParameter("api", "access_control_allow_origin"),
@@ -219,7 +278,11 @@ CONFIGS_CHANGES = [
),
ConfigChange(
config=ConfigParameter("api", "auth_backend"),
- renamed_to=ConfigParameter("api", "auth_backends"),
+ renamed_to=ConfigParameter("fab", "auth_backends"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("api", "auth_backends"),
+ renamed_to=ConfigParameter("fab", "auth_backends"),
),
# logging
ConfigChange(
@@ -239,6 +302,18 @@ CONFIGS_CHANGES = [
config=ConfigParameter("logging", "log_processor_filename_template"),
was_deprecated=False,
),
+ ConfigChange(
+ config=ConfigParameter("logging", "log_filename_template"),
+ was_removed=True,
+ remove_if_equals="{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{
try_number }}.log",
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("logging", "log_filename_template"),
+ was_removed=True,
+ remove_if_equals="dag_id={{ ti.dag_id }}/run_id={{ ti.run_id
}}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index
}}/{% endif %}attempt={{ try_number }}.log",
+ breaking=True,
+ ),
# metrics
ConfigChange(
config=ConfigParameter("metrics", "metrics_use_pattern_match"),
@@ -284,19 +359,207 @@ CONFIGS_CHANGES = [
config=ConfigParameter("webserver", option="auth_rate_limit"),
renamed_to=ConfigParameter("fab", "auth_rate_limit"),
),
+ ConfigChange(
+ config=ConfigParameter("webserver", "config_file"),
+ renamed_to=ConfigParameter("fab", "config_file"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "session_backend"),
+ renamed_to=ConfigParameter("fab", "session_backend"),
+ ),
ConfigChange(
config=ConfigParameter("webserver", "session_lifetime_days"),
- renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"),
+ renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
),
ConfigChange(
config=ConfigParameter("webserver", "force_log_out_after"),
- renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"),
+ renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "session_lifetime_minutes"),
+ renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "base_url"),
+ renamed_to=ConfigParameter("api", "base_url"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_host"),
+ renamed_to=ConfigParameter("api", "host"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_port"),
+ renamed_to=ConfigParameter("api", "port"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "workers"),
+ renamed_to=ConfigParameter("api", "workers"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_worker_timeout"),
+ renamed_to=ConfigParameter("api", "worker_timeout"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_ssl_cert"),
+ renamed_to=ConfigParameter("api", "ssl_cert"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_ssl_key"),
+ renamed_to=ConfigParameter("api", "ssl_key"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "access_logfile"),
+ renamed_to=ConfigParameter("api", "access_logfile"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "error_logfile"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "access_logformat"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "web_server_master_timeout"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "worker_refresh_batch_size"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "worker_refresh_interval"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "reload_on_plugin_change"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "worker_class"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "expose_stacktrace"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "log_fetch_delay_sec"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "log_auto_tailing_offset"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "log_animation_speed"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "default_dag_run_display_number"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "enable_proxy_fix"),
+ renamed_to=ConfigParameter("fab", "enable_proxy_fix"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "proxy_fix_x_for"),
+ renamed_to=ConfigParameter("fab", "proxy_fix_x_for"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "proxy_fix_x_proto"),
+ renamed_to=ConfigParameter("fab", "proxy_fix_x_proto"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "proxy_fix_x_host"),
+ renamed_to=ConfigParameter("fab", "proxy_fix_x_host"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "proxy_fix_x_port"),
+ renamed_to=ConfigParameter("fab", "proxy_fix_x_port"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "proxy_fix_x_prefix"),
+ renamed_to=ConfigParameter("fab", "proxy_fix_x_prefix"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "expose_config"),
+ renamed_to=ConfigParameter("api", "expose_config"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "cookie_secure"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "analytics_tool"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "analytics_id"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "analytics_url"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver",
"show_recent_stats_for_completed_runs"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "run_internal_api"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "caching_hash_method"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "show_trigger_form_if_no_params"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver",
"num_recent_configurations_for_trigger"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "allowed_payload_size"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "max_form_memory_size"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "max_form_parts"),
+ was_deprecated=False,
+ ),
+ ConfigChange(
+ config=ConfigParameter("webserver", "default_ui_timezone"),
+ was_deprecated=False,
),
# policy
ConfigChange(
config=ConfigParameter("policy", "airflow_local_settings"),
renamed_to=ConfigParameter("policy", "task_policy"),
),
+ ConfigChange(
+ config=ConfigParameter("webserver", "navbar_logo_text_color"),
+ was_deprecated=False,
+ ),
# scheduler
ConfigChange(
config=ConfigParameter("scheduler", "dependency_detector"),
@@ -304,6 +567,36 @@ CONFIGS_CHANGES = [
ConfigChange(
config=ConfigParameter("scheduler", "allow_trigger_in_future"),
),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "catchup_by_default"),
+ default_change=True,
+ old_default="True",
+ was_removed=False,
+ new_default="False",
+ suggestion="In Airflow 3.0 the default value for `catchup_by_default`
is set to `False`. "
+ "This means that DAGs without explicit definition of the `catchup`
parameter will not "
+ "catchup by default. "
+ "If your DAGs rely on catchup behavior, not explicitly defined in the
DAG definition, "
+ "set this configuration parameter to `True` in the `scheduler` section
of your `airflow.cfg` "
+ "to enable the behavior from Airflow 2.x.",
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "create_cron_data_intervals"),
+ default_change=True,
+ old_default="True",
+ new_default="False",
+ was_removed=False,
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "create_delta_data_intervals"),
+ default_change=True,
+ old_default="True",
+ new_default="False",
+ was_removed=False,
+ breaking=True,
+ ),
ConfigChange(
config=ConfigParameter("scheduler", "processor_poll_interval"),
renamed_to=ConfigParameter("scheduler", "scheduler_idle_sleep_time"),
@@ -382,6 +675,23 @@ CONFIGS_CHANGES = [
ConfigChange(
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
+ breaking=True,
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "local_task_job_heartbeat_sec"),
+ renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_sec"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "scheduler_zombie_task_threshold"),
+ renamed_to=ConfigParameter("scheduler",
"task_instance_heartbeat_timeout"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "zombie_detection_interval"),
+ renamed_to=ConfigParameter("scheduler",
"task_instance_heartbeat_timeout_detection_interval"),
+ ),
+ ConfigChange(
+ config=ConfigParameter("scheduler", "child_process_log_directory"),
+ renamed_to=ConfigParameter("logging",
"dag_processor_child_process_log_directory"),
),
# celery
ConfigChange(
@@ -414,6 +724,29 @@ CONFIGS_CHANGES = [
config=ConfigParameter("smtp", "smtp_password"),
suggestion="Please use the SMTP connection (`smtp_default`).",
),
+ # database
+ ConfigChange(
+ config=ConfigParameter("database", "load_default_connections"),
+ ),
+ # triggerer
+ ConfigChange(
+ config=ConfigParameter("triggerer", "default_capacity"),
+ renamed_to=ConfigParameter("triggerer", "capacity"),
+ breaking=True,
+ ),
+ # email
+ ConfigChange(
+ config=ConfigParameter("email", "email_backend"),
+ was_removed=True,
+ remove_if_equals="airflow.contrib.utils.sendgrid.send_email",
+ ),
+ # elasticsearch
+ ConfigChange(
+ config=ConfigParameter("elasticsearch", "log_id_template"),
+ was_removed=True,
+ remove_if_equals="{dag_id}-{task_id}-{logical_date}-{try_number}",
+ breaking=True,
+ ),
]
@@ -449,14 +782,14 @@ def lint_config(args) -> None:
1. Lint all sections and options:
airflow config lint
- 2. Lint a specific sections:
+ 2. Lint a specific section:
airflow config lint --section core,webserver
- 3. Lint a specific sections and options:
+ 3. Lint specific sections and options:
airflow config lint --section smtp --option smtp_user
- 4. Ignore a sections:
- irflow config lint --ignore-section webserver,api
+ 4. Ignore a section:
+ airflow config lint --ignore-section webserver,api
5. Ignore an options:
airflow config lint --ignore-option smtp_user,session_lifetime_days
@@ -485,8 +818,11 @@ def lint_config(args) -> None:
if configuration.config.section in ignore_sections or
configuration.config.option in ignore_options:
continue
- if conf.has_option(configuration.config.section,
configuration.config.option):
- lint_issues.append(configuration.message)
+ if conf.has_option(
+ configuration.config.section, configuration.config.option,
lookup_from_deprecated=False
+ ):
+ if configuration.message is not None:
+ lint_issues.append(configuration.message)
if lint_issues:
console.print("[red]Found issues in your airflow.cfg:[/red]")
@@ -499,3 +835,179 @@ def lint_config(args) -> None:
console.print("\n[red]Please update your configuration file
accordingly.[/red]")
else:
console.print("[green]No issues found in your airflow.cfg. It is ready
for Airflow 3![/green]")
+
+
+@providers_configuration_loaded
+def update_config(args) -> None:
+ """
+ Update the airflow.cfg file to migrate configuration changes from Airflow
2.x to Airflow 3.
+
+ By default, this command will perform a dry-run (showing the changes only)
and list only
+ the breaking configuration changes by scanning the current configuration
file for parameters that have
+ been renamed, removed, or had their default values changed in Airflow 3.0.
To see or fix all recommended
+ changes, use the --all-recommendations argument. To automatically update
your airflow.cfg file, use
+ the --fix argument. This command cleans up the existing comments in
airflow.cfg but creates a backup of
+ the old airflow.cfg file.
+
+ CLI Arguments:
+ --fix: flag (optional)
+ Automatically fix/apply the breaking changes (or all changes if
--all-recommendations is also
+ specified)
+ Example: --fix
+
+ --all-recommendations: flag (optional)
+ Include non-breaking (recommended) changes as well as breaking
ones.
+ Can be used with --fix.
+ Example: --all-recommendations
+
+ --section: str (optional)
+ Comma-separated list of configuration sections to update.
+ Example: --section core,database
+
+ --option: str (optional)
+ Comma-separated list of configuration options to update.
+ Example: --option sql_alchemy_conn,dag_concurrency
+
+ --ignore-section: str (optional)
+ Comma-separated list of configuration sections to ignore during
update.
+ Example: --ignore-section webserver
+
+ --ignore-option: str (optional)
+ Comma-separated list of configuration options to ignore during
update.
+ Example: --ignore-option check_slas
+
+ Examples:
+ 1. Dry-run mode (print the changes in modified airflow.cfg) showing
only breaking changes:
+ airflow config update
+
+ 2. Dry-run mode showing all recommendations:
+ airflow config update --all-recommendations
+
+ 3. Apply (fix) only breaking changes:
+ airflow config update --fix
+
+ 4. Apply (fix) all recommended changes:
+ airflow config update --fix --all-recommendations
+
+ 5. Show changes only the specific sections:
+ airflow config update --section core,database
+
+ 6.Show changes only the specific options:
+ airflow config update --option sql_alchemy_conn,dag_concurrency
+
+ 7. Ignores the specific section:
+ airflow config update --ignore-section webserver
+
+ :param args: The CLI arguments for updating configuration.
+ """
+ console = AirflowConsole()
+ changes_applied: list[str] = []
+ modifications = ConfigModifications()
+
+ include_all = args.all_recommendations if args.all_recommendations else
False
+ apply_fix = args.fix if args.fix else False
+ dry_run = not apply_fix
+ update_sections = args.section if args.section else None
+ update_options = args.option if args.option else None
+ ignore_sections = args.ignore_section if args.ignore_section else []
+ ignore_options = args.ignore_option if args.ignore_option else []
+
+ config_dict = conf.as_dict(
+ display_source=True,
+ include_env=False,
+ include_cmds=False,
+ include_secret=True,
+ display_sensitive=True,
+ )
+ for change in CONFIGS_CHANGES:
+ if not include_all and not change.breaking:
+ continue
+ conf_section = change.config.section.lower()
+ conf_option = change.config.option.lower()
+ full_key = f"{conf_section}.{conf_option}"
+
+ if update_sections is not None and conf_section not in [s.lower() for
s in update_sections]:
+ continue
+ if update_options is not None and full_key not in [opt.lower() for opt
in update_options]:
+ continue
+ if conf_section in [s.lower() for s in ignore_sections] or full_key in
[
+ opt.lower() for opt in ignore_options
+ ]:
+ continue
+
+ if conf_section not in config_dict or conf_option not in
config_dict[conf_section]:
+ continue
+ value_data = config_dict[conf_section][conf_option]
+ if not (isinstance(value_data, tuple) and value_data[1] ==
"airflow.cfg"):
+ continue
+
+ current_value = value_data[0]
+ prefix = "[[red]BREAKING[/red]]" if change.breaking else
"[[yellow]Recommended[/yellow]]"
+ if change.default_change:
+ if str(current_value) != str(change.new_default):
+ modifications.add_default_update(conf_section, conf_option,
str(change.new_default))
+ changes_applied.append(
+ f"{prefix} Updated default value of
'{conf_section}/{conf_option}' from "
+ f"'{current_value}' to '{change.new_default}'."
+ )
+ if change.renamed_to:
+ modifications.add_rename(
+ conf_section, conf_option, change.renamed_to.section,
change.renamed_to.option
+ )
+ changes_applied.append(
+ f"{prefix} Renamed '{conf_section}/{conf_option}' to "
+
f"'{change.renamed_to.section.lower()}/{change.renamed_to.option.lower()}'."
+ )
+ elif change.was_removed:
+ if change.remove_if_equals is not None:
+ if str(current_value) == str(change.remove_if_equals):
+ modifications.add_remove(conf_section, conf_option)
+ changes_applied.append(
+ f"{prefix} Removed '{conf_section}/{conf_option}' from
configuration."
+ )
+ else:
+ modifications.add_remove(conf_section, conf_option)
+ changes_applied.append(f"{prefix} Removed
'{conf_section}/{conf_option}' from configuration.")
+
+ backup_path = f"{AIRFLOW_CONFIG}.bak"
+ try:
+ shutil.copy2(AIRFLOW_CONFIG, backup_path)
+ console.print(f"Backup saved as '{backup_path}'.")
+ except Exception as e:
+ console.print(f"Failed to create backup: {e}")
+ raise AirflowConfigException("Backup creation failed. Aborting
update_config operation.")
+
+ if dry_run:
+ console.print("[blue]Dry-run mode enabled. No changes will be written
to airflow.cfg.[/blue]")
+ with StringIO() as config_output:
+ conf.write_custom_config(
+ file=config_output,
+ comment_out_defaults=True,
+ include_descriptions=True,
+ modifications=modifications,
+ )
+ new_config = config_output.getvalue()
+ console.print(new_config)
+ else:
+ with open(AIRFLOW_CONFIG, "w") as config_file:
+ conf.write_custom_config(
+ file=config_file,
+ comment_out_defaults=True,
+ include_descriptions=True,
+ modifications=modifications,
+ )
+
+ if changes_applied:
+ console.print("[green]The following are the changes in airflow
config:[/green]")
+ for change_msg in changes_applied:
+ console.print(f" - {change_msg}")
+ if dry_run:
+ console.print(
+ "[blue]Dry-run is mode enabled. To apply above airflow.cfg run
the command "
+ "with `--fix`.[/blue]"
+ )
+ else:
+ console.print("[green]No updates needed. Your configuration is already
up-to-date.[/green]")
+
+ if args.verbose:
+ console.print("[blue]Configuration update completed with verbose
output enabled.[/blue]")
diff --git a/airflow/config_templates/pre_2_7_defaults.cfg
b/airflow/config_templates/provider_config_fallback_defaults.cfg
similarity index 66%
rename from airflow/config_templates/pre_2_7_defaults.cfg
rename to airflow/config_templates/provider_config_fallback_defaults.cfg
index 2568d42445f..ba92feaef47 100644
--- a/airflow/config_templates/pre_2_7_defaults.cfg
+++ b/airflow/config_templates/provider_config_fallback_defaults.cfg
@@ -16,15 +16,21 @@
# specific language governing permissions and limitations
# under the License.
-# This file contains pre Airflow 2.7, provider defaults for Airflow
configuration.
-# They are provided as fallback option to older version of the
-# providers that might expect them to be present.
+# This file contains provider defaults for Airflow configuration, containing
fallback default values
+# that might be needed when provider classes are being imported - before
provider's configuration
+# is loaded.
+#
+# Unfortunately airflow currently performs a lot of stuff during importing and
some of that might lead
+# to retrieving provider configuration before the defaults for the provider
are loaded.
+#
+# Those are only defaults, so if you have "real" values configured in your
configuration (.cfg file or
+# environment variables) those will be used as usual.
+
+# NOTE!! Do NOT attempt to remove those default fallbacks thinking that they
are unnecessary duplication,
+# at least not until we fix the way how airflow imports "do stuff". This is
unlikely to succeed.
+#
+# You've been warned!
#
-# NOTE !!!! Please DO NOT modify values in the file even if they change in
corresponding
-# providers. The values here should be treated as "read only" and should not
be modified
-# even if defaults in newer versions of corresponding Providers change.
-# They are only here so that backwards compatible behaviour for old provider
-# versions can be maintained.
[atlas]
sasl_enabled = False
@@ -81,6 +87,29 @@ index_patterns = _all
use_ssl = False
verify_certs = True
+[opensearch]
+host =
+port =
+username =
+password =
+log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+end_of_log_mark = end_of_log
+write_stdout = False
+json_format = False
+json_fields = asctime, filename, lineno, levelname, message
+host_field = host
+offset_field = offset
+index_patterns = _all
+index_patterns_callable =
+
+[opensearch_configs]
+http_compress = False
+use_ssl = False
+verify_certs = False
+ssl_assert_hostname = False
+ssl_show_warn = False
+ca_certs =
+
[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 3affeb3c24d..fdb8e3821dd 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import builtins
import contextlib
import datetime
import functools
@@ -72,6 +73,30 @@ ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
ENV_VAR_PREFIX = "AIRFLOW__"
+class ConfigModifications:
+ """
+ Holds modifications to be applied when writing out the config.
+
+ :param rename: Mapping from (old_section, old_option) to (new_section,
new_option)
+ :param remove: Set of (section, option) to remove
+ :param default_updates: Mapping from (section, option) to new default value
+ """
+
+ def __init__(self) -> None:
+ self.rename: dict[tuple[str, str], tuple[str, str]] = {}
+ self.remove: builtins.set[tuple[str, str]] = builtins.set() # mypy is
conflicting with conf set
+ self.default_updates: dict[tuple[str, str], str] = {}
+
+ def add_rename(self, old_section: str, old_option: str, new_section: str,
new_option: str) -> None:
+ self.rename[(old_section, old_option)] = (new_section, new_option)
+
+ def add_remove(self, section: str, option: str) -> None:
+ self.remove.add((section, option))
+
+ def add_default_update(self, section: str, option: str, new_default: str)
-> None:
+ self.default_updates[(section, option)] = new_default
+
+
def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
if match is None:
@@ -207,7 +232,7 @@ class AirflowConfigParser(ConfigParser):
# interpolation placeholders. The _default_values config parser will
interpolate them
# properly when we call get() on it.
self._default_values =
create_default_config_parser(self.configuration_description)
- self._pre_2_7_default_values = create_pre_2_7_defaults()
+ self._provider_config_fallback_default_values =
create_provider_config_fallback_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
@@ -290,9 +315,9 @@ class AirflowConfigParser(ConfigParser):
return value.replace("%", "%%")
return value
- def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) ->
Any:
- """Get pre 2.7 default config values."""
- return self._pre_2_7_default_values.get(section, key, fallback=None,
**kwargs)
+ def get_provider_config_fallback_defaults(self, section: str, key: str,
**kwargs) -> Any:
+ """Get provider config fallback default values."""
+ return self._provider_config_fallback_default_values.get(section, key,
fallback=None, **kwargs)
# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
@@ -636,6 +661,88 @@ class AirflowConfigParser(ConfigParser):
if needs_separation:
file.write("\n")
+ def write_custom_config(
+ self,
+ file: IO[str],
+ comment_out_defaults: bool = True,
+ include_descriptions: bool = True,
+ extra_spacing: bool = True,
+ modifications: ConfigModifications | None = None,
+ ) -> None:
+ """
+ Write a configuration file using a ConfigModifications object.
+
+ This method includes only options from the current airflow.cfg. For
each option:
+ - If it's marked for removal, omit it.
+ - If renamed, output it under its new name and add a comment
indicating its original location.
+ - If a default update is specified, apply the new default and output
the option as a commented line.
+ - Otherwise, if the current value equals the default and
comment_out_defaults is True, output it as a comment.
+ Options absent from the current airflow.cfg are omitted.
+
+ :param file: File to write the configuration.
+ :param comment_out_defaults: If True, options whose value equals the
default are written as comments.
+ :param include_descriptions: Whether to include section descriptions.
+ :param extra_spacing: Whether to insert an extra blank line after each
option.
+ :param modifications: ConfigModifications instance with rename,
remove, and default updates.
+ """
+ modifications = modifications or ConfigModifications()
+ output: dict[str, list[tuple[str, str, bool, str]]] = {}
+
+ for section in self._sections: # type: ignore[attr-defined] #
accessing _sections from ConfigParser
+ for option, orig_value in self._sections[section].items(): #
type: ignore[attr-defined]
+ key = (section.lower(), option.lower())
+ if key in modifications.remove:
+ continue
+
+ mod_comment = ""
+ if key in modifications.rename:
+ new_sec, new_opt = modifications.rename[key]
+ effective_section = new_sec
+ effective_option = new_opt
+ mod_comment += f"# Renamed from {section}.{option}\n"
+ else:
+ effective_section = section
+ effective_option = option
+
+ value = orig_value
+ if key in modifications.default_updates:
+ mod_comment += (
+ f"# Default updated from {orig_value} to
{modifications.default_updates[key]}\n"
+ )
+ value = modifications.default_updates[key]
+
+ default_value = self.get_default_value(effective_section,
effective_option, fallback="")
+ is_default = str(value) == str(default_value)
+ output.setdefault(effective_section.lower(), []).append(
+ (effective_option, str(value), is_default, mod_comment)
+ )
+
+ for section, options in output.items():
+ section_buffer = StringIO()
+ section_buffer.write(f"[{section}]\n")
+ if include_descriptions:
+ description = self.configuration_description.get(section,
{}).get("description", "")
+ if description:
+ for line in description.splitlines():
+ section_buffer.write(f"# {line}\n")
+ section_buffer.write("\n")
+ for option, value_str, is_default, mod_comment in options:
+ key = (section.lower(), option.lower())
+ if key in modifications.default_updates and
comment_out_defaults:
+ section_buffer.write(f"# {option} = {value_str}\n")
+ else:
+ if mod_comment:
+ section_buffer.write(mod_comment)
+ if is_default and comment_out_defaults:
+ section_buffer.write(f"# {option} = {value_str}\n")
+ else:
+ section_buffer.write(f"{option} = {value_str}\n")
+ if extra_spacing:
+ section_buffer.write("\n")
+ content = section_buffer.getvalue().strip()
+ if content:
+ file.write(f"{content}\n\n")
+
def write( # type: ignore[override]
self,
file: IO[str],
@@ -961,58 +1068,62 @@ class AirflowConfigParser(ConfigParser):
section: str,
key: str,
suppress_warnings: bool = False,
+ lookup_from_deprecated: bool = True,
_extra_stacklevel: int = 0,
**kwargs,
) -> str | None:
section = section.lower()
key = key.lower()
warning_emitted = False
- deprecated_section: str | None
- deprecated_key: str | None
+ deprecated_section: str | None = None
+ deprecated_key: str | None = None
- option_description = self.configuration_description.get(section,
{}).get(key, {})
- if option_description.get("deprecated"):
- deprecation_reason = option_description.get("deprecation_reason",
"")
- warnings.warn(
- f"The '{key}' option in section {section} is deprecated.
{deprecation_reason}",
- DeprecationWarning,
- stacklevel=2 + _extra_stacklevel,
+ if lookup_from_deprecated:
+ option_description = (
+ self.configuration_description.get(section, {}).get("options",
{}).get(key, {})
)
- # For when we rename whole sections
- if section in self.inversed_deprecated_sections:
- deprecated_section, deprecated_key = (section, key)
- section = self.inversed_deprecated_sections[section]
- if not self._suppress_future_warnings:
- warnings.warn(
- f"The config section [{deprecated_section}] has been
renamed to "
- f"[{section}]. Please update your `conf.get*` call to use
the new name",
- FutureWarning,
- stacklevel=2 + _extra_stacklevel,
- )
- # Don't warn about individual rename if the whole section is
renamed
- warning_emitted = True
- elif (section, key) in self.inversed_deprecated_options:
- # Handle using deprecated section/key instead of the new
section/key
- new_section, new_key = self.inversed_deprecated_options[(section,
key)]
- if not self._suppress_future_warnings and not warning_emitted:
+ if option_description.get("deprecated"):
+ deprecation_reason =
option_description.get("deprecation_reason", "")
warnings.warn(
- f"section/key [{section}/{key}] has been deprecated, you
should use"
- f"[{new_section}/{new_key}] instead. Please update your
`conf.get*` call to use the "
- "new name",
- FutureWarning,
+ f"The '{key}' option in section {section} is deprecated.
{deprecation_reason}",
+ DeprecationWarning,
stacklevel=2 + _extra_stacklevel,
)
+ # For the cases in which we rename whole sections
+ if section in self.inversed_deprecated_sections:
+ deprecated_section, deprecated_key = (section, key)
+ section = self.inversed_deprecated_sections[section]
+ if not self._suppress_future_warnings:
+ warnings.warn(
+ f"The config section [{deprecated_section}] has been
renamed to "
+ f"[{section}]. Please update your `conf.get*` call to
use the new name",
+ FutureWarning,
+ stacklevel=2 + _extra_stacklevel,
+ )
+ # Don't warn about individual rename if the whole section is
renamed
warning_emitted = True
- deprecated_section, deprecated_key = section, key
- section, key = (new_section, new_key)
- elif section in self.deprecated_sections:
- # When accessing the new section name, make sure we check under
the old config name
- deprecated_key = key
- deprecated_section = self.deprecated_sections[section][0]
- else:
- deprecated_section, deprecated_key, _ =
self.deprecated_options.get(
- (section, key), (None, None, None)
- )
+ elif (section, key) in self.inversed_deprecated_options:
+ # Handle using deprecated section/key instead of the new
section/key
+ new_section, new_key =
self.inversed_deprecated_options[(section, key)]
+ if not self._suppress_future_warnings and not warning_emitted:
+ warnings.warn(
+ f"section/key [{section}/{key}] has been deprecated,
you should use"
+ f"[{new_section}/{new_key}] instead. Please update
your `conf.get*` call to use the "
+ "new name",
+ FutureWarning,
+ stacklevel=2 + _extra_stacklevel,
+ )
+ warning_emitted = True
+ deprecated_section, deprecated_key = section, key
+ section, key = (new_section, new_key)
+ elif section in self.deprecated_sections:
+ # When accessing the new section name, make sure we check
under the old config name
+ deprecated_key = key
+ deprecated_section = self.deprecated_sections[section][0]
+ else:
+ deprecated_section, deprecated_key, _ =
self.deprecated_options.get(
+ (section, key), (None, None, None)
+ )
# first check environment variables
option = self._get_environment_variables(
deprecated_key,
@@ -1066,9 +1177,9 @@ class AirflowConfigParser(ConfigParser):
if self.get_default_value(section, key) is not None or "fallback" in
kwargs:
return expand_env_var(self.get_default_value(section, key,
**kwargs))
- if self.get_default_pre_2_7_value(section, key) is not None:
+ if self.get_provider_config_fallback_defaults(section, key) is not
None:
# no expansion needed
- return self.get_default_pre_2_7_value(section, key, **kwargs)
+ return self.get_provider_config_fallback_defaults(section, key,
**kwargs)
if not suppress_warnings:
log.warning("section/key [%s/%s] not found in config", section,
key)
@@ -1317,7 +1428,7 @@ class AirflowConfigParser(ConfigParser):
"""
super().read_dict(dictionary=dictionary, source=source)
- def has_option(self, section: str, option: str) -> bool:
+ def has_option(self, section: str, option: str, lookup_from_deprecated:
bool = True) -> bool:
"""
Check if option is defined.
@@ -1326,10 +1437,18 @@ class AirflowConfigParser(ConfigParser):
:param section: section to get option from
:param option: option to get
+ :param lookup_from_deprecated: If True, check if the option is defined
in deprecated sections
:return:
"""
try:
- value = self.get(section, option, fallback=None,
_extra_stacklevel=1, suppress_warnings=True)
+ value = self.get(
+ section,
+ option,
+ fallback=None,
+ _extra_stacklevel=1,
+ suppress_warnings=True,
+ lookup_from_deprecated=lookup_from_deprecated,
+ )
if value is None:
return False
return True
@@ -1459,7 +1578,7 @@ class AirflowConfigParser(ConfigParser):
# We check sequentially all those sources and the last one we saw it
in will "win"
configs: Iterable[tuple[str, ConfigParser]] = [
- ("default-pre-2-7", self._pre_2_7_default_values),
+ ("provider-fallback-defaults",
self._provider_config_fallback_default_values),
("default", self._default_values),
("airflow.cfg", self),
]
@@ -1985,17 +2104,27 @@ def
create_default_config_parser(configuration_description: dict[str, dict[str,
return parser
-def create_pre_2_7_defaults() -> ConfigParser:
+def create_provider_config_fallback_defaults() -> ConfigParser:
"""
- Create parser using the old defaults from Airflow < 2.7.0.
+ Create fallback defaults.
+
+ This parser contains provider defaults for Airflow configuration,
containing fallback default values
+ that might be needed when provider classes are being imported - before
provider's configuration
+ is loaded.
+
+ Unfortunately airflow currently performs a lot of stuff during importing
and some of that might lead
+ to retrieving provider configuration before the defaults for the provider
are loaded.
+
+ Those are only defaults, so if you have "real" values configured in your
configuration (.cfg file or
+ environment variables) those will be used as usual.
+
+ NOTE!! Do NOT attempt to remove those default fallbacks thinking that they
are unnecessary duplication,
+ at least not until we fix the way how airflow imports "do stuff". This is
unlikely to succeed.
- This is used in order to be able to fall-back to those defaults when old
version of provider,
- not supporting "config contribution" is installed with Airflow 2.7.0+.
This "default"
- configuration does not support variable expansion, those are pretty much
hard-coded defaults '
- we want to fall-back to in such case.
+ You've been warned!
"""
config_parser = ConfigParser()
- config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
+
config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
return config_parser
diff --git a/tests/cli/commands/test_config_command.py
b/tests/cli/commands/test_config_command.py
index 5e87b90e31f..8330127c79d 100644
--- a/tests/cli/commands/test_config_command.py
+++ b/tests/cli/commands/test_config_command.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import contextlib
import os
import re
+import shutil
from io import StringIO
from unittest import mock
@@ -27,6 +28,7 @@ import pytest
from airflow.cli import cli_parser
from airflow.cli.commands import config_command
from airflow.cli.commands.config_command import ConfigChange, ConfigParameter
+from airflow.configuration import conf
from tests.test_utils.config import conf_vars
STATSD_CONFIG_BEGIN_WITH = "# `StatsD <https://github.com/statsd/statsd>`"
@@ -233,7 +235,10 @@ class TestCliConfigGetValue:
class TestConfigLint:
- @pytest.mark.parametrize("removed_config", config_command.CONFIGS_CHANGES)
+ @pytest.mark.parametrize(
+ "removed_config",
+ [config for config in config_command.CONFIGS_CHANGES if
config.was_removed and config.message],
+ )
def test_lint_detects_removed_configs(self, removed_config):
with mock.patch("airflow.configuration.conf.has_option",
return_value=True):
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
@@ -246,6 +251,23 @@ class TestConfigLint:
assert normalized_message in normalized_output
+ @pytest.mark.parametrize(
+ "default_changed_config",
+ [config for config in config_command.CONFIGS_CHANGES if
config.default_change],
+ )
+ def test_lint_detects_default_changed_configs(self,
default_changed_config):
+ with mock.patch("airflow.configuration.conf.has_option",
return_value=True):
+ with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+
config_command.lint_config(cli_parser.get_parser().parse_args(["config",
"lint"]))
+
+ output = temp_stdout.getvalue()
+
+ if default_changed_config.message is not None:
+ normalized_output = re.sub(r"\s+", " ", output.strip())
+ normalized_message = re.sub(r"\s+", " ",
default_changed_config.message.strip())
+
+ assert normalized_message in normalized_output
+
@pytest.mark.parametrize(
"section, option, suggestion",
[
@@ -316,7 +338,8 @@ class TestConfigLint:
def test_lint_detects_multiple_issues(self):
with mock.patch(
"airflow.configuration.conf.has_option",
- side_effect=lambda s, o: o in ["check_slas",
"strict_dataset_uri_validation"],
+ side_effect=lambda section, option, lookup_from_deprecated: option
+ in ["check_slas", "strict_dataset_uri_validation"],
):
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
config_command.lint_config(cli_parser.get_parser().parse_args(["config",
"lint"]))
@@ -417,7 +440,7 @@ class TestConfigLint:
"Removed deprecated `check_slas` configuration parameter from
`core` section.",
),
(
- "AIRFLOW__CORE__STRICT_ASSET_URI_VALIDATION",
+ "AIRFLOW__CORE__strict_dataset_uri_validation",
ConfigChange(
config=ConfigParameter("core",
"strict_dataset_uri_validation"),
suggestion="Dataset URI with a defined scheme will now
always be validated strictly, raising a hard error on validation failure.",
@@ -438,3 +461,122 @@ class TestConfigLint:
assert expected_message in normalized_output
assert config_change.suggestion in normalized_output
+
+ def test_lint_detects_invalid_config(self):
+ with mock.patch.dict(os.environ, {"AIRFLOW__CORE__PARALLELISM": "0"}):
+ with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+
config_command.lint_config(cli_parser.get_parser().parse_args(["config",
"lint"]))
+
+ output = temp_stdout.getvalue()
+
+ normalized_output = re.sub(r"\s+", " ", output.strip())
+
+ assert (
+ "Invalid value `0` set for `parallelism` configuration parameter
in `core` section."
+ in normalized_output
+ )
+
+ def test_lint_detects_invalid_config_negative(self):
+ with mock.patch.dict(os.environ, {"AIRFLOW__CORE__PARALLELISM": "42"}):
+ with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+
config_command.lint_config(cli_parser.get_parser().parse_args(["config",
"lint"]))
+
+ output = temp_stdout.getvalue()
+
+ normalized_output = re.sub(r"\s+", " ", output.strip())
+
+ assert "Invalid value" not in normalized_output
+
+
+class TestCliConfigUpdate:
+ @classmethod
+ def setup_class(cls):
+ cls.parser = cli_parser.get_parser()
+
+ @pytest.fixture(autouse=True)
+ def setup_fake_airflow_cfg(self, tmp_path, monkeypatch):
+ fake_config = tmp_path / "airflow.cfg"
+ fake_config.write_text(
+ """
+ [test_admin]
+ rename_key = legacy_value
+ remove_key = to_be_removed
+ [test_core]
+ dags_folder = /some/path/to/dags
+ default_key = OldDefault"""
+ )
+ monkeypatch.setenv("AIRFLOW_CONFIG", str(fake_config))
+ monkeypatch.setattr(config_command, "AIRFLOW_CONFIG", str(fake_config))
+ conf.read(str(fake_config))
+ return fake_config
+
+ def test_update_renamed_option(self, monkeypatch, setup_fake_airflow_cfg):
+ fake_config = setup_fake_airflow_cfg
+ renamed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "rename_key"),
+ renamed_to=ConfigParameter("test_core", "renamed_key"),
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[renamed_change])
+ assert conf.has_option("test_admin", "rename_key")
+ args = self.parser.parse_args(["config", "update", "--fix",
"--all-recommendations"])
+ config_command.update_config(args)
+ content = fake_config.read_text()
+ admin_section = content.split("[test_admin]")[-1]
+ assert "rename_key" not in admin_section
+ core_section = content.split("[test_core]")[-1]
+ assert "renamed_key" in core_section
+ assert "# Renamed from test_admin.rename_key" in content
+
+ def test_update_removed_option(self, monkeypatch, setup_fake_airflow_cfg):
+ fake_config = setup_fake_airflow_cfg
+ removed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "remove_key"),
+ suggestion="Option removed in Airflow 3.0.",
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[removed_change])
+ assert conf.has_option("test_admin", "remove_key")
+ args = self.parser.parse_args(["config", "update", "--fix",
"--all-recommendations"])
+ config_command.update_config(args)
+ content = fake_config.read_text()
+ assert "remove_key" not in content
+
+ def test_update_no_changes(self, monkeypatch, capsys):
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES", [])
+ args = self.parser.parse_args(["config", "update"])
+ config_command.update_config(args)
+ captured = capsys.readouterr().out
+ assert "No updates needed" in captured
+
+ def test_update_backup_creation(self, monkeypatch):
+ removed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "remove_key"),
+ suggestion="Option removed.",
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[removed_change])
+ assert conf.has_option("test_admin", "remove_key")
+ args = self.parser.parse_args(["config", "update"])
+ mock_copy = mock.MagicMock()
+ monkeypatch.setattr(shutil, "copy2", mock_copy)
+ config_command.update_config(args)
+ backup_path = os.environ.get("AIRFLOW_CONFIG") + ".bak"
+ mock_copy.assert_called_once_with(os.environ.get("AIRFLOW_CONFIG"),
backup_path)
+
+ def test_update_only_breaking_changes_with_fix(self, monkeypatch,
setup_fake_airflow_cfg):
+ fake_config = setup_fake_airflow_cfg
+ breaking_change = ConfigChange(
+ config=ConfigParameter("test_admin", "rename_key"),
+ renamed_to=ConfigParameter("test_admin", "new_breaking_key"),
+ breaking=True,
+ )
+ non_breaking_change = ConfigChange(
+ config=ConfigParameter("test_admin", "remove_key"),
+ suggestion="Option removed.",
+ breaking=False,
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[breaking_change, non_breaking_change])
+ args = self.parser.parse_args(["config", "update", "--fix"])
+ config_command.update_config(args)
+ content = fake_config.read_text()
+ assert "rename_key = legacy_value" not in content
+ assert "new_breaking_key" in content
+ assert "remove_key" in content
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index d48c4665310..cceef50a313 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -954,6 +954,48 @@ class TestDeprecatedConf:
with pytest.warns(DeprecationWarning), conf_vars({("celery",
"celeryd_concurrency"): "99"}):
assert conf.getint("celery", "worker_concurrency") == 99
+ @pytest.mark.parametrize(
+ "deprecated_options_dict, kwargs, new_section_expected_value,
old_section_expected_value",
+ [
+ pytest.param(
+ {("old_section", "old_key"): ("new_section", "new_key",
"2.0.0")},
+ {"fallback": None},
+ None,
+ "value",
+ id="deprecated_in_different_section_lookup_enabled",
+ ),
+ pytest.param(
+ {("old_section", "old_key"): ("new_section", "new_key",
"2.0.0")},
+ {"fallback": None, "lookup_from_deprecated": False},
+ None,
+ None,
+ id="deprecated_in_different_section_lookup_disabled",
+ ),
+ pytest.param(
+ {("new_section", "old_key"): ("new_section", "new_key",
"2.0.0")},
+ {"fallback": None},
+ "value",
+ None,
+ id="deprecated_in_same_section_lookup_enabled",
+ ),
+ pytest.param(
+ {("new_section", "old_key"): ("new_section", "new_key",
"2.0.0")},
+ {"fallback": None, "lookup_from_deprecated": False},
+ None,
+ None,
+ id="deprecated_in_same_section_lookup_disabled",
+ ),
+ ],
+ )
+ def test_deprecated_options_with_lookup_from_deprecated(
+ self, deprecated_options_dict, kwargs, new_section_expected_value,
old_section_expected_value
+ ):
+ with conf_vars({("new_section", "new_key"): "value"}):
+ with
set_deprecated_options(deprecated_options=deprecated_options_dict):
+ assert conf.get("new_section", "old_key", **kwargs) ==
new_section_expected_value
+
+ assert conf.get("old_section", "old_key", **kwargs) ==
old_section_expected_value
+
@conf_vars(
{
("logging", "logging_level"): None,