This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-11-test by this push:
     new a0062c8a509 Backport `airflow config update` and `airflow config lint` 
changes (#50353)
a0062c8a509 is described below

commit a0062c8a5092c2c2e6587a5d6b0bbfd75e12b58f
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Fri May 9 14:47:31 2025 +0545

    Backport `airflow config update` and `airflow config lint` changes (#50353)
    
    * Fix Legacy configuration not removed or updated (#46544)
    
    * Fix Legacy configuration not removed or updated
    
    (cherry picked from commit 971973725bd368297fda8dbe096ed6b199440ad0)
    
    * allow_trigger_in_future config removal (#46667)
    
    (cherry picked from commit 65bacad3199d302ba5d9ddb8486f7029e6b171f1)
    
    * Backport airflow config update and airflow config lint changes
    
    * Rename pre-2-7 defaults to provider_config_fallback_defaults (#44895)
    
    * Rename pre-2-7 defaults to provider_config_fallback_defaults
    
    After unsuccessful atempts to remove them, renaming and adding
    appropriate warning against removing them seems to be a good idea.
    
    * Update airflow/config_templates/provider_config_fallback_defaults.cfg
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    (cherry picked from commit 8c013b6a89da81832270841dea529f86cbe42c6f)
    
    * Fix `lookup_from_deprecated_options` in AirflowConfigParser (#47004)
    
    * Fix deprecated_options in AirflowConfigParser
    
    * Add test_deprecated_options_with_lookup_from_deprecated, rename as 
lookup_from_deprecated
    
    * fixup! Add docstring for lookup_from_deprecated
    
    * Fix fixture params naming
    
    (cherry picked from commit 8e3d25f909756b7438092ace915293b443026d37)
    
    * Fix the tests
    
    ---------
    
    Co-authored-by: LIU ZHE YOU <[email protected]>
    Co-authored-by: Phani Kumar <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow/cli/cli_config.py                          |  49 +-
 airflow/cli/commands/config_command.py             | 546 ++++++++++++++++++++-
 ...s.cfg => provider_config_fallback_defaults.cfg} |  45 +-
 airflow/configuration.py                           | 241 ++++++---
 tests/cli/commands/test_config_command.py          | 148 +++++-
 tests/core/test_configuration.py                   |  42 ++
 6 files changed, 986 insertions(+), 85 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index ce753e2f4bb..ccc81ffc77c 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -973,6 +973,39 @@ ARG_LINT_CONFIG_IGNORE_OPTION = Arg(
     type=string_list_type,
 )
 
+# config update
+ARG_UPDATE_CONFIG_SECTION = Arg(
+    ("--section",),
+    help="The section name(s) to update in the airflow config.",
+    type=string_list_type,
+)
+ARG_UPDATE_CONFIG_OPTION = Arg(
+    ("--option",),
+    help="The option name(s) to update in the airflow config.",
+    type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_SECTION = Arg(
+    ("--ignore-section",),
+    help="The section name(s) to ignore to update in the airflow config.",
+    type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_OPTION = Arg(
+    ("--ignore-option",),
+    help="The option name(s) to ignore to update in the airflow config.",
+    type=string_list_type,
+)
+ARG_UPDATE_CONFIG_FIX = Arg(
+    ("--fix",),
+    help="Automatically apply the configuration changes instead of performing 
a dry run. (Default: dry-run mode)",
+    action="store_true",
+)
+
+ARG_UPDATE_ALL_RECOMMENDATIONS = Arg(
+    ("--all-recommendations",),
+    help="Include non-breaking (recommended) changes along with breaking ones. 
(Also use with --fix)",
+    action="store_true",
+)
+
 # kubernetes cleanup-pods
 ARG_NAMESPACE = Arg(
     ("--namespace",),
@@ -1894,7 +1927,7 @@ CONFIG_COMMANDS = (
     ActionCommand(
         name="lint",
         help="lint options for the configuration changes while migrating from 
Airflow 2.x to Airflow 3.0",
-        
func=lazy_load_command("airflow.cli.commands.remote_commands.config_command.lint_config"),
+        
func=lazy_load_command("airflow.cli.commands.config_command.lint_config"),
         args=(
             ARG_LINT_CONFIG_SECTION,
             ARG_LINT_CONFIG_OPTION,
@@ -1903,6 +1936,20 @@ CONFIG_COMMANDS = (
             ARG_VERBOSE,
         ),
     ),
+    ActionCommand(
+        name="update",
+        help="update options for the configuration changes while migrating 
from Airflow 2.x to Airflow 3.0",
+        
func=lazy_load_command("airflow.cli.commands.config_command.update_config"),
+        args=(
+            ARG_UPDATE_CONFIG_SECTION,
+            ARG_UPDATE_CONFIG_OPTION,
+            ARG_UPDATE_CONFIG_IGNORE_SECTION,
+            ARG_UPDATE_CONFIG_IGNORE_OPTION,
+            ARG_VERBOSE,
+            ARG_UPDATE_CONFIG_FIX,
+            ARG_UPDATE_ALL_RECOMMENDATIONS,
+        ),
+    ),
 )
 
 KUBERNETES_COMMANDS = (
diff --git a/airflow/cli/commands/config_command.py 
b/airflow/cli/commands/config_command.py
index 36be0cd2e06..0f5c5df5ba1 100644
--- a/airflow/cli/commands/config_command.py
+++ b/airflow/cli/commands/config_command.py
@@ -18,15 +18,16 @@
 
 from __future__ import annotations
 
+import shutil
 from dataclasses import dataclass
 from io import StringIO
-from typing import NamedTuple
+from typing import Any, NamedTuple
 
 import pygments
 from pygments.lexers.configs import IniLexer
 
 from airflow.cli.simple_table import AirflowConsole
-from airflow.configuration import conf
+from airflow.configuration import AIRFLOW_CONFIG, ConfigModifications, conf
 from airflow.exceptions import AirflowConfigException
 from airflow.utils.cli import should_use_colors
 from airflow.utils.code_utils import get_terminal_formatter
@@ -82,19 +83,41 @@ class ConfigChange:
     Class representing the configuration changes in Airflow 3.0.
 
     :param config: The configuration parameter being changed.
+    :param default_change: If the change is a default value change.
+    :param old_default: The old default value (valid only if default_change is 
True).
+    :param new_default: The new default value for the configuration parameter.
     :param suggestion: A suggestion for replacing or handling the removed 
configuration.
     :param renamed_to: The new section and option if the configuration is 
renamed.
     :param was_deprecated: If the config is removed, whether the old config 
was deprecated.
+    :param was_removed: If the config is removed.
+    :param is_invalid_if: If the current config value is invalid in the future.
+    :param breaking: Mark if this change is known to be breaking and causing 
errors/ warnings / deprecations.
+    :param remove_if_equals: For removal rules, remove the option only if its 
current value equals this value.
     """
 
     config: ConfigParameter
+    default_change: bool = False
+    old_default: str | bool | int | float | None = None
+    new_default: str | bool | int | float | None = None
     suggestion: str = ""
     renamed_to: ConfigParameter | None = None
     was_deprecated: bool = True
+    was_removed: bool = True
+    is_invalid_if: Any = None
+    breaking: bool = False
+    remove_if_equals: str | bool | int | float | None = None
 
     @property
-    def message(self) -> str:
+    def message(self) -> str | None:
         """Generate a message for this configuration change."""
+        if self.default_change:
+            value = conf.get(self.config.section, self.config.option)
+            if value != self.new_default:
+                return (
+                    f"Changed default value of `{self.config.option}` in 
`{self.config.section}` "
+                    f"from `{self.old_default}` to `{self.new_default}`. "
+                    f"You currently have `{value}` set. {self.suggestion}"
+                )
         if self.renamed_to:
             if self.config.section != self.renamed_to.section:
                 return (
@@ -105,11 +128,20 @@ class ConfigChange:
                 f"`{self.config.option}` configuration parameter renamed to 
`{self.renamed_to.option}` "
                 f"in the `{self.config.section}` section."
             )
-        return (
-            f"Removed{' deprecated' if self.was_deprecated else ''} 
`{self.config.option}` configuration parameter "
-            f"from `{self.config.section}` section. "
-            f"{self.suggestion}"
-        )
+        if self.was_removed and not self.remove_if_equals:
+            return (
+                f"Removed{' deprecated' if self.was_deprecated else ''} 
`{self.config.option}` configuration parameter "
+                f"from `{self.config.section}` section. "
+                f"{self.suggestion}"
+            )
+        if self.is_invalid_if is not None:
+            value = conf.get(self.config.section, self.config.option)
+            if value == self.is_invalid_if:
+                return (
+                    f"Invalid value `{self.is_invalid_if}` set for 
`{self.config.option}` configuration parameter "
+                    f"in `{self.config.section}` section. {self.suggestion}"
+                )
+        return None
 
 
 CONFIGS_CHANGES = [
@@ -123,6 +155,19 @@ CONFIGS_CHANGES = [
         renamed_to=ConfigParameter("core", "sensitive_var_conn_names"),
     ),
     # core
+    ConfigChange(
+        config=ConfigParameter("core", "executor"),
+        default_change=True,
+        old_default="SequentialExecutor",
+        new_default="LocalExecutor",
+        was_removed=False,
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("core", "hostname"),
+        was_removed=True,
+        remove_if_equals=":",
+    ),
     ConfigChange(
         config=ConfigParameter("core", "check_slas"),
         suggestion="The SLA feature is removed in Airflow 3.0, to be replaced 
with Airflow Alerts in future",
@@ -132,6 +177,14 @@ CONFIGS_CHANGES = [
         suggestion="Dataset URI with a defined scheme will now always be 
validated strictly, "
         "raising a hard error on validation failure.",
     ),
+    ConfigChange(
+        config=ConfigParameter("core", "dag_default_view"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("core", "dag_orientation"),
+        was_deprecated=False,
+    ),
     ConfigChange(
         config=ConfigParameter("core", "dataset_manager_class"),
         renamed_to=ConfigParameter("core", "asset_manager_class"),
@@ -212,6 +265,12 @@ CONFIGS_CHANGES = [
     ConfigChange(
         config=ConfigParameter("core", "log_processor_filename_template"),
     ),
+    ConfigChange(
+        config=ConfigParameter("core", "parallelism"),
+        was_removed=False,
+        is_invalid_if="0",
+        suggestion="Please set the `parallelism` configuration parameter to a 
value greater than 0.",
+    ),
     # api
     ConfigChange(
         config=ConfigParameter("api", "access_control_allow_origin"),
@@ -219,7 +278,11 @@ CONFIGS_CHANGES = [
     ),
     ConfigChange(
         config=ConfigParameter("api", "auth_backend"),
-        renamed_to=ConfigParameter("api", "auth_backends"),
+        renamed_to=ConfigParameter("fab", "auth_backends"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("api", "auth_backends"),
+        renamed_to=ConfigParameter("fab", "auth_backends"),
     ),
     # logging
     ConfigChange(
@@ -239,6 +302,18 @@ CONFIGS_CHANGES = [
         config=ConfigParameter("logging", "log_processor_filename_template"),
         was_deprecated=False,
     ),
+    ConfigChange(
+        config=ConfigParameter("logging", "log_filename_template"),
+        was_removed=True,
+        remove_if_equals="{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ 
try_number }}.log",
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("logging", "log_filename_template"),
+        was_removed=True,
+        remove_if_equals="dag_id={{ ti.dag_id }}/run_id={{ ti.run_id 
}}/task_id={{ ti.task_id }}/{% if ti.map_index >= 0 %}map_index={{ ti.map_index 
}}/{% endif %}attempt={{ try_number }}.log",
+        breaking=True,
+    ),
     # metrics
     ConfigChange(
         config=ConfigParameter("metrics", "metrics_use_pattern_match"),
@@ -284,19 +359,207 @@ CONFIGS_CHANGES = [
         config=ConfigParameter("webserver", option="auth_rate_limit"),
         renamed_to=ConfigParameter("fab", "auth_rate_limit"),
     ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "config_file"),
+        renamed_to=ConfigParameter("fab", "config_file"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "session_backend"),
+        renamed_to=ConfigParameter("fab", "session_backend"),
+    ),
     ConfigChange(
         config=ConfigParameter("webserver", "session_lifetime_days"),
-        renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"),
+        renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
     ),
     ConfigChange(
         config=ConfigParameter("webserver", "force_log_out_after"),
-        renamed_to=ConfigParameter("webserver", "session_lifetime_minutes"),
+        renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "session_lifetime_minutes"),
+        renamed_to=ConfigParameter("fab", "session_lifetime_minutes"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "base_url"),
+        renamed_to=ConfigParameter("api", "base_url"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_host"),
+        renamed_to=ConfigParameter("api", "host"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_port"),
+        renamed_to=ConfigParameter("api", "port"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "workers"),
+        renamed_to=ConfigParameter("api", "workers"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_worker_timeout"),
+        renamed_to=ConfigParameter("api", "worker_timeout"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_ssl_cert"),
+        renamed_to=ConfigParameter("api", "ssl_cert"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_ssl_key"),
+        renamed_to=ConfigParameter("api", "ssl_key"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "access_logfile"),
+        renamed_to=ConfigParameter("api", "access_logfile"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "error_logfile"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "access_logformat"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "web_server_master_timeout"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "worker_refresh_batch_size"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "worker_refresh_interval"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "reload_on_plugin_change"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "worker_class"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "expose_stacktrace"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "log_fetch_delay_sec"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "log_auto_tailing_offset"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "log_animation_speed"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "default_dag_run_display_number"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "enable_proxy_fix"),
+        renamed_to=ConfigParameter("fab", "enable_proxy_fix"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "proxy_fix_x_for"),
+        renamed_to=ConfigParameter("fab", "proxy_fix_x_for"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "proxy_fix_x_proto"),
+        renamed_to=ConfigParameter("fab", "proxy_fix_x_proto"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "proxy_fix_x_host"),
+        renamed_to=ConfigParameter("fab", "proxy_fix_x_host"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "proxy_fix_x_port"),
+        renamed_to=ConfigParameter("fab", "proxy_fix_x_port"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "proxy_fix_x_prefix"),
+        renamed_to=ConfigParameter("fab", "proxy_fix_x_prefix"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "expose_config"),
+        renamed_to=ConfigParameter("api", "expose_config"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "cookie_secure"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "analytics_tool"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "analytics_id"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "analytics_url"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", 
"show_recent_stats_for_completed_runs"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "run_internal_api"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "caching_hash_method"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "show_trigger_form_if_no_params"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", 
"num_recent_configurations_for_trigger"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "allowed_payload_size"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "max_form_memory_size"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "max_form_parts"),
+        was_deprecated=False,
+    ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "default_ui_timezone"),
+        was_deprecated=False,
     ),
     # policy
     ConfigChange(
         config=ConfigParameter("policy", "airflow_local_settings"),
         renamed_to=ConfigParameter("policy", "task_policy"),
     ),
+    ConfigChange(
+        config=ConfigParameter("webserver", "navbar_logo_text_color"),
+        was_deprecated=False,
+    ),
     # scheduler
     ConfigChange(
         config=ConfigParameter("scheduler", "dependency_detector"),
@@ -304,6 +567,36 @@ CONFIGS_CHANGES = [
     ConfigChange(
         config=ConfigParameter("scheduler", "allow_trigger_in_future"),
     ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "catchup_by_default"),
+        default_change=True,
+        old_default="True",
+        was_removed=False,
+        new_default="False",
+        suggestion="In Airflow 3.0 the default value for `catchup_by_default` 
is set to `False`. "
+        "This means that DAGs without explicit definition of the `catchup` 
parameter will not "
+        "catchup by default. "
+        "If your DAGs rely on catchup behavior, not explicitly defined in the 
DAG definition, "
+        "set this configuration parameter to `True` in the `scheduler` section 
of your `airflow.cfg` "
+        "to enable the behavior from Airflow 2.x.",
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "create_cron_data_intervals"),
+        default_change=True,
+        old_default="True",
+        new_default="False",
+        was_removed=False,
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "create_delta_data_intervals"),
+        default_change=True,
+        old_default="True",
+        new_default="False",
+        was_removed=False,
+        breaking=True,
+    ),
     ConfigChange(
         config=ConfigParameter("scheduler", "processor_poll_interval"),
         renamed_to=ConfigParameter("scheduler", "scheduler_idle_sleep_time"),
@@ -382,6 +675,23 @@ CONFIGS_CHANGES = [
     ConfigChange(
         config=ConfigParameter("scheduler", "dag_dir_list_interval"),
         renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
+        breaking=True,
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "local_task_job_heartbeat_sec"),
+        renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_sec"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "scheduler_zombie_task_threshold"),
+        renamed_to=ConfigParameter("scheduler", 
"task_instance_heartbeat_timeout"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "zombie_detection_interval"),
+        renamed_to=ConfigParameter("scheduler", 
"task_instance_heartbeat_timeout_detection_interval"),
+    ),
+    ConfigChange(
+        config=ConfigParameter("scheduler", "child_process_log_directory"),
+        renamed_to=ConfigParameter("logging", 
"dag_processor_child_process_log_directory"),
     ),
     # celery
     ConfigChange(
@@ -414,6 +724,29 @@ CONFIGS_CHANGES = [
         config=ConfigParameter("smtp", "smtp_password"),
         suggestion="Please use the SMTP connection (`smtp_default`).",
     ),
+    # database
+    ConfigChange(
+        config=ConfigParameter("database", "load_default_connections"),
+    ),
+    # triggerer
+    ConfigChange(
+        config=ConfigParameter("triggerer", "default_capacity"),
+        renamed_to=ConfigParameter("triggerer", "capacity"),
+        breaking=True,
+    ),
+    # email
+    ConfigChange(
+        config=ConfigParameter("email", "email_backend"),
+        was_removed=True,
+        remove_if_equals="airflow.contrib.utils.sendgrid.send_email",
+    ),
+    # elasticsearch
+    ConfigChange(
+        config=ConfigParameter("elasticsearch", "log_id_template"),
+        was_removed=True,
+        remove_if_equals="{dag_id}-{task_id}-{logical_date}-{try_number}",
+        breaking=True,
+    ),
 ]
 
 
@@ -449,14 +782,14 @@ def lint_config(args) -> None:
         1. Lint all sections and options:
             airflow config lint
 
-        2. Lint a specific sections:
+        2. Lint a specific section:
             airflow config lint --section core,webserver
 
-        3. Lint a specific sections and options:
+        3. Lint specific sections and options:
             airflow config lint --section smtp --option smtp_user
 
-        4. Ignore a sections:
-            irflow config lint --ignore-section webserver,api
+        4. Ignore a section:
+            airflow config lint --ignore-section webserver,api
 
         5. Ignore an options:
             airflow config lint --ignore-option smtp_user,session_lifetime_days
@@ -485,8 +818,11 @@ def lint_config(args) -> None:
         if configuration.config.section in ignore_sections or 
configuration.config.option in ignore_options:
             continue
 
-        if conf.has_option(configuration.config.section, 
configuration.config.option):
-            lint_issues.append(configuration.message)
+        if conf.has_option(
+            configuration.config.section, configuration.config.option, 
lookup_from_deprecated=False
+        ):
+            if configuration.message is not None:
+                lint_issues.append(configuration.message)
 
     if lint_issues:
         console.print("[red]Found issues in your airflow.cfg:[/red]")
@@ -499,3 +835,179 @@ def lint_config(args) -> None:
         console.print("\n[red]Please update your configuration file 
accordingly.[/red]")
     else:
         console.print("[green]No issues found in your airflow.cfg. It is ready 
for Airflow 3![/green]")
+
+
+@providers_configuration_loaded
+def update_config(args) -> None:
+    """
+    Update the airflow.cfg file to migrate configuration changes from Airflow 
2.x to Airflow 3.
+
+    By default, this command will perform a dry-run (showing the changes only) 
and list only
+    the breaking configuration changes by scanning the current configuration 
file for parameters that have
+    been renamed, removed, or had their default values changed in Airflow 3.0. 
To see or fix all recommended
+    changes, use the --all-recommendations argument. To automatically update 
your airflow.cfg file, use
+    the --fix argument. This command cleans up the existing comments in 
airflow.cfg but creates a backup of
+    the old airflow.cfg file.
+
+    CLI Arguments:
+        --fix: flag (optional)
+            Automatically fix/apply the breaking changes (or all changes if 
--all-recommendations is also
+            specified)
+            Example: --fix
+
+        --all-recommendations: flag (optional)
+            Include non-breaking (recommended) changes as well as breaking 
ones.
+            Can be used with --fix.
+            Example: --all-recommendations
+
+        --section: str (optional)
+            Comma-separated list of configuration sections to update.
+            Example: --section core,database
+
+        --option: str (optional)
+            Comma-separated list of configuration options to update.
+            Example: --option sql_alchemy_conn,dag_concurrency
+
+        --ignore-section: str (optional)
+            Comma-separated list of configuration sections to ignore during 
update.
+            Example: --ignore-section webserver
+
+        --ignore-option: str (optional)
+            Comma-separated list of configuration options to ignore during 
update.
+            Example: --ignore-option check_slas
+
+    Examples:
+        1. Dry-run mode (print the changes in modified airflow.cfg) showing 
only breaking changes:
+            airflow config update
+
+        2. Dry-run mode showing all recommendations:
+            airflow config update --all-recommendations
+
+        3. Apply (fix) only breaking changes:
+            airflow config update --fix
+
+        4. Apply (fix) all recommended changes:
+            airflow config update --fix --all-recommendations
+
+        5. Show changes only the specific sections:
+            airflow config update --section core,database
+
+        6.Show changes only the specific options:
+            airflow config update --option sql_alchemy_conn,dag_concurrency
+
+        7. Ignores the specific section:
+            airflow config update --ignore-section webserver
+
+    :param args: The CLI arguments for updating configuration.
+    """
+    console = AirflowConsole()
+    changes_applied: list[str] = []
+    modifications = ConfigModifications()
+
+    include_all = args.all_recommendations if args.all_recommendations else 
False
+    apply_fix = args.fix if args.fix else False
+    dry_run = not apply_fix
+    update_sections = args.section if args.section else None
+    update_options = args.option if args.option else None
+    ignore_sections = args.ignore_section if args.ignore_section else []
+    ignore_options = args.ignore_option if args.ignore_option else []
+
+    config_dict = conf.as_dict(
+        display_source=True,
+        include_env=False,
+        include_cmds=False,
+        include_secret=True,
+        display_sensitive=True,
+    )
+    for change in CONFIGS_CHANGES:
+        if not include_all and not change.breaking:
+            continue
+        conf_section = change.config.section.lower()
+        conf_option = change.config.option.lower()
+        full_key = f"{conf_section}.{conf_option}"
+
+        if update_sections is not None and conf_section not in [s.lower() for 
s in update_sections]:
+            continue
+        if update_options is not None and full_key not in [opt.lower() for opt 
in update_options]:
+            continue
+        if conf_section in [s.lower() for s in ignore_sections] or full_key in 
[
+            opt.lower() for opt in ignore_options
+        ]:
+            continue
+
+        if conf_section not in config_dict or conf_option not in 
config_dict[conf_section]:
+            continue
+        value_data = config_dict[conf_section][conf_option]
+        if not (isinstance(value_data, tuple) and value_data[1] == 
"airflow.cfg"):
+            continue
+
+        current_value = value_data[0]
+        prefix = "[[red]BREAKING[/red]]" if change.breaking else 
"[[yellow]Recommended[/yellow]]"
+        if change.default_change:
+            if str(current_value) != str(change.new_default):
+                modifications.add_default_update(conf_section, conf_option, 
str(change.new_default))
+                changes_applied.append(
+                    f"{prefix} Updated default value of 
'{conf_section}/{conf_option}' from "
+                    f"'{current_value}' to '{change.new_default}'."
+                )
+        if change.renamed_to:
+            modifications.add_rename(
+                conf_section, conf_option, change.renamed_to.section, 
change.renamed_to.option
+            )
+            changes_applied.append(
+                f"{prefix} Renamed '{conf_section}/{conf_option}' to "
+                
f"'{change.renamed_to.section.lower()}/{change.renamed_to.option.lower()}'."
+            )
+        elif change.was_removed:
+            if change.remove_if_equals is not None:
+                if str(current_value) == str(change.remove_if_equals):
+                    modifications.add_remove(conf_section, conf_option)
+                    changes_applied.append(
+                        f"{prefix} Removed '{conf_section}/{conf_option}' from 
configuration."
+                    )
+            else:
+                modifications.add_remove(conf_section, conf_option)
+                changes_applied.append(f"{prefix} Removed 
'{conf_section}/{conf_option}' from configuration.")
+
+    backup_path = f"{AIRFLOW_CONFIG}.bak"
+    try:
+        shutil.copy2(AIRFLOW_CONFIG, backup_path)
+        console.print(f"Backup saved as '{backup_path}'.")
+    except Exception as e:
+        console.print(f"Failed to create backup: {e}")
+        raise AirflowConfigException("Backup creation failed. Aborting 
update_config operation.")
+
+    if dry_run:
+        console.print("[blue]Dry-run mode enabled. No changes will be written 
to airflow.cfg.[/blue]")
+        with StringIO() as config_output:
+            conf.write_custom_config(
+                file=config_output,
+                comment_out_defaults=True,
+                include_descriptions=True,
+                modifications=modifications,
+            )
+            new_config = config_output.getvalue()
+        console.print(new_config)
+    else:
+        with open(AIRFLOW_CONFIG, "w") as config_file:
+            conf.write_custom_config(
+                file=config_file,
+                comment_out_defaults=True,
+                include_descriptions=True,
+                modifications=modifications,
+            )
+
+    if changes_applied:
+        console.print("[green]The following are the changes in airflow 
config:[/green]")
+        for change_msg in changes_applied:
+            console.print(f"  - {change_msg}")
+        if dry_run:
+            console.print(
+                "[blue]Dry-run is mode enabled. To apply above airflow.cfg run 
the command "
+                "with `--fix`.[/blue]"
+            )
+    else:
+        console.print("[green]No updates needed. Your configuration is already 
up-to-date.[/green]")
+
+    if args.verbose:
+        console.print("[blue]Configuration update completed with verbose 
output enabled.[/blue]")
diff --git a/airflow/config_templates/pre_2_7_defaults.cfg 
b/airflow/config_templates/provider_config_fallback_defaults.cfg
similarity index 66%
rename from airflow/config_templates/pre_2_7_defaults.cfg
rename to airflow/config_templates/provider_config_fallback_defaults.cfg
index 2568d42445f..ba92feaef47 100644
--- a/airflow/config_templates/pre_2_7_defaults.cfg
+++ b/airflow/config_templates/provider_config_fallback_defaults.cfg
@@ -16,15 +16,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# This file contains pre Airflow 2.7, provider defaults for Airflow 
configuration.
-# They are provided as fallback option to older version of the
-# providers that might expect them to be present.
+# This file contains provider defaults for Airflow configuration, containing 
fallback default values
+# that might be needed when provider classes are being imported - before 
provider's configuration
+# is loaded.
+#
+# Unfortunately airflow currently performs a lot of stuff during importing and 
some of that might lead
+# to retrieving provider configuration before the defaults for the provider 
are loaded.
+#
+# Those are only defaults, so if you have "real" values configured in your 
configuration (.cfg file or
+# environment variables) those will be used as usual.
+
+# NOTE!! Do NOT attempt to remove those default fallbacks thinking that they 
are unnecessary duplication,
+# at least not until we fix the way how airflow imports "do stuff". This is 
unlikely to succeed.
+#
+# You've been warned!
 #
-# NOTE !!!! Please DO NOT modify values in the file even if they change in 
corresponding
-# providers. The values here should be treated as "read only" and should not 
be modified
-# even if defaults in newer versions of corresponding Providers change.
-# They are only here so that backwards compatible behaviour for old provider
-# versions can be maintained.
 
 [atlas]
 sasl_enabled = False
@@ -81,6 +87,29 @@ index_patterns = _all
 use_ssl = False
 verify_certs = True
 
+[opensearch]
+host =
+port =
+username =
+password =
+log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+end_of_log_mark = end_of_log
+write_stdout = False
+json_format = False
+json_fields = asctime, filename, lineno, levelname, message
+host_field = host
+offset_field = offset
+index_patterns = _all
+index_patterns_callable =
+
+[opensearch_configs]
+http_compress = False
+use_ssl = False
+verify_certs = False
+ssl_assert_hostname = False
+ssl_show_warn = False
+ca_certs =
+
 [kubernetes_executor]
 api_client_retry_configuration =
 logs_task_metadata = False
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 3affeb3c24d..fdb8e3821dd 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import builtins
 import contextlib
 import datetime
 import functools
@@ -72,6 +73,30 @@ ConfigSourcesType = Dict[str, ConfigSectionSourcesType]
 ENV_VAR_PREFIX = "AIRFLOW__"
 
 
+class ConfigModifications:
+    """
+    Holds modifications to be applied when writing out the config.
+
+    :param rename: Mapping from (old_section, old_option) to (new_section, 
new_option)
+    :param remove: Set of (section, option) to remove
+    :param default_updates: Mapping from (section, option) to new default value
+    """
+
+    def __init__(self) -> None:
+        self.rename: dict[tuple[str, str], tuple[str, str]] = {}
+        self.remove: builtins.set[tuple[str, str]] = builtins.set()  # mypy is 
conflicting with conf set
+        self.default_updates: dict[tuple[str, str], str] = {}
+
+    def add_rename(self, old_section: str, old_option: str, new_section: str, 
new_option: str) -> None:
+        self.rename[(old_section, old_option)] = (new_section, new_option)
+
+    def add_remove(self, section: str, option: str) -> None:
+        self.remove.add((section, option))
+
+    def add_default_update(self, section: str, option: str, new_default: str) 
-> None:
+        self.default_updates[(section, option)] = new_default
+
+
 def _parse_sqlite_version(s: str) -> tuple[int, ...]:
     match = _SQLITE3_VERSION_PATTERN.match(s)
     if match is None:
@@ -207,7 +232,7 @@ class AirflowConfigParser(ConfigParser):
         # interpolation placeholders. The _default_values config parser will 
interpolate them
         # properly when we call get() on it.
         self._default_values = 
create_default_config_parser(self.configuration_description)
-        self._pre_2_7_default_values = create_pre_2_7_defaults()
+        self._provider_config_fallback_default_values = 
create_provider_config_fallback_defaults()
         if default_config is not None:
             self._update_defaults_from_string(default_config)
         self._update_logging_deprecated_template_to_one_from_defaults()
@@ -290,9 +315,9 @@ class AirflowConfigParser(ConfigParser):
             return value.replace("%", "%%")
         return value
 
-    def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> 
Any:
-        """Get pre 2.7 default config values."""
-        return self._pre_2_7_default_values.get(section, key, fallback=None, 
**kwargs)
+    def get_provider_config_fallback_defaults(self, section: str, key: str, 
**kwargs) -> Any:
+        """Get provider config fallback default values."""
+        return self._provider_config_fallback_default_values.get(section, key, 
fallback=None, **kwargs)
 
     # These configuration elements can be fetched as the stdout of commands
     # following the "{section}__{name}_cmd" pattern, the idea behind this
@@ -636,6 +661,88 @@ class AirflowConfigParser(ConfigParser):
         if needs_separation:
             file.write("\n")
 
+    def write_custom_config(
+        self,
+        file: IO[str],
+        comment_out_defaults: bool = True,
+        include_descriptions: bool = True,
+        extra_spacing: bool = True,
+        modifications: ConfigModifications | None = None,
+    ) -> None:
+        """
+        Write a configuration file using a ConfigModifications object.
+
+        This method includes only options from the current airflow.cfg. For 
each option:
+          - If it's marked for removal, omit it.
+          - If renamed, output it under its new name and add a comment 
indicating its original location.
+          - If a default update is specified, apply the new default and output 
the option as a commented line.
+          - Otherwise, if the current value equals the default and 
comment_out_defaults is True, output it as a comment.
+        Options absent from the current airflow.cfg are omitted.
+
+        :param file: File to write the configuration.
+        :param comment_out_defaults: If True, options whose value equals the 
default are written as comments.
+        :param include_descriptions: Whether to include section descriptions.
+        :param extra_spacing: Whether to insert an extra blank line after each 
option.
+        :param modifications: ConfigModifications instance with rename, 
remove, and default updates.
+        """
+        modifications = modifications or ConfigModifications()
+        output: dict[str, list[tuple[str, str, bool, str]]] = {}
+
+        for section in self._sections:  # type: ignore[attr-defined]  # 
accessing _sections from ConfigParser
+            for option, orig_value in self._sections[section].items():  # 
type: ignore[attr-defined]
+                key = (section.lower(), option.lower())
+                if key in modifications.remove:
+                    continue
+
+                mod_comment = ""
+                if key in modifications.rename:
+                    new_sec, new_opt = modifications.rename[key]
+                    effective_section = new_sec
+                    effective_option = new_opt
+                    mod_comment += f"# Renamed from {section}.{option}\n"
+                else:
+                    effective_section = section
+                    effective_option = option
+
+                value = orig_value
+                if key in modifications.default_updates:
+                    mod_comment += (
+                        f"# Default updated from {orig_value} to 
{modifications.default_updates[key]}\n"
+                    )
+                    value = modifications.default_updates[key]
+
+                default_value = self.get_default_value(effective_section, 
effective_option, fallback="")
+                is_default = str(value) == str(default_value)
+                output.setdefault(effective_section.lower(), []).append(
+                    (effective_option, str(value), is_default, mod_comment)
+                )
+
+        for section, options in output.items():
+            section_buffer = StringIO()
+            section_buffer.write(f"[{section}]\n")
+            if include_descriptions:
+                description = self.configuration_description.get(section, 
{}).get("description", "")
+                if description:
+                    for line in description.splitlines():
+                        section_buffer.write(f"# {line}\n")
+                    section_buffer.write("\n")
+            for option, value_str, is_default, mod_comment in options:
+                key = (section.lower(), option.lower())
+                if key in modifications.default_updates and 
comment_out_defaults:
+                    section_buffer.write(f"# {option} = {value_str}\n")
+                else:
+                    if mod_comment:
+                        section_buffer.write(mod_comment)
+                    if is_default and comment_out_defaults:
+                        section_buffer.write(f"# {option} = {value_str}\n")
+                    else:
+                        section_buffer.write(f"{option} = {value_str}\n")
+                if extra_spacing:
+                    section_buffer.write("\n")
+            content = section_buffer.getvalue().strip()
+            if content:
+                file.write(f"{content}\n\n")
+
     def write(  # type: ignore[override]
         self,
         file: IO[str],
@@ -961,58 +1068,62 @@ class AirflowConfigParser(ConfigParser):
         section: str,
         key: str,
         suppress_warnings: bool = False,
+        lookup_from_deprecated: bool = True,
         _extra_stacklevel: int = 0,
         **kwargs,
     ) -> str | None:
         section = section.lower()
         key = key.lower()
         warning_emitted = False
-        deprecated_section: str | None
-        deprecated_key: str | None
+        deprecated_section: str | None = None
+        deprecated_key: str | None = None
 
-        option_description = self.configuration_description.get(section, 
{}).get(key, {})
-        if option_description.get("deprecated"):
-            deprecation_reason = option_description.get("deprecation_reason", 
"")
-            warnings.warn(
-                f"The '{key}' option in section {section} is deprecated. 
{deprecation_reason}",
-                DeprecationWarning,
-                stacklevel=2 + _extra_stacklevel,
+        if lookup_from_deprecated:
+            option_description = (
+                self.configuration_description.get(section, {}).get("options", 
{}).get(key, {})
             )
-        # For when we rename whole sections
-        if section in self.inversed_deprecated_sections:
-            deprecated_section, deprecated_key = (section, key)
-            section = self.inversed_deprecated_sections[section]
-            if not self._suppress_future_warnings:
-                warnings.warn(
-                    f"The config section [{deprecated_section}] has been 
renamed to "
-                    f"[{section}]. Please update your `conf.get*` call to use 
the new name",
-                    FutureWarning,
-                    stacklevel=2 + _extra_stacklevel,
-                )
-            # Don't warn about individual rename if the whole section is 
renamed
-            warning_emitted = True
-        elif (section, key) in self.inversed_deprecated_options:
-            # Handle using deprecated section/key instead of the new 
section/key
-            new_section, new_key = self.inversed_deprecated_options[(section, 
key)]
-            if not self._suppress_future_warnings and not warning_emitted:
+            if option_description.get("deprecated"):
+                deprecation_reason = 
option_description.get("deprecation_reason", "")
                 warnings.warn(
-                    f"section/key [{section}/{key}] has been deprecated, you 
should use"
-                    f"[{new_section}/{new_key}] instead. Please update your 
`conf.get*` call to use the "
-                    "new name",
-                    FutureWarning,
+                    f"The '{key}' option in section {section} is deprecated. 
{deprecation_reason}",
+                    DeprecationWarning,
                     stacklevel=2 + _extra_stacklevel,
                 )
+            # For the cases in which we rename whole sections
+            if section in self.inversed_deprecated_sections:
+                deprecated_section, deprecated_key = (section, key)
+                section = self.inversed_deprecated_sections[section]
+                if not self._suppress_future_warnings:
+                    warnings.warn(
+                        f"The config section [{deprecated_section}] has been 
renamed to "
+                        f"[{section}]. Please update your `conf.get*` call to 
use the new name",
+                        FutureWarning,
+                        stacklevel=2 + _extra_stacklevel,
+                    )
+                # Don't warn about individual rename if the whole section is 
renamed
                 warning_emitted = True
-            deprecated_section, deprecated_key = section, key
-            section, key = (new_section, new_key)
-        elif section in self.deprecated_sections:
-            # When accessing the new section name, make sure we check under 
the old config name
-            deprecated_key = key
-            deprecated_section = self.deprecated_sections[section][0]
-        else:
-            deprecated_section, deprecated_key, _ = 
self.deprecated_options.get(
-                (section, key), (None, None, None)
-            )
+            elif (section, key) in self.inversed_deprecated_options:
+                # Handle using deprecated section/key instead of the new 
section/key
+                new_section, new_key = 
self.inversed_deprecated_options[(section, key)]
+                if not self._suppress_future_warnings and not warning_emitted:
+                    warnings.warn(
+                        f"section/key [{section}/{key}] has been deprecated, 
you should use"
+                        f"[{new_section}/{new_key}] instead. Please update 
your `conf.get*` call to use the "
+                        "new name",
+                        FutureWarning,
+                        stacklevel=2 + _extra_stacklevel,
+                    )
+                    warning_emitted = True
+                deprecated_section, deprecated_key = section, key
+                section, key = (new_section, new_key)
+            elif section in self.deprecated_sections:
+                # When accessing the new section name, make sure we check 
under the old config name
+                deprecated_key = key
+                deprecated_section = self.deprecated_sections[section][0]
+            else:
+                deprecated_section, deprecated_key, _ = 
self.deprecated_options.get(
+                    (section, key), (None, None, None)
+                )
         # first check environment variables
         option = self._get_environment_variables(
             deprecated_key,
@@ -1066,9 +1177,9 @@ class AirflowConfigParser(ConfigParser):
         if self.get_default_value(section, key) is not None or "fallback" in 
kwargs:
             return expand_env_var(self.get_default_value(section, key, 
**kwargs))
 
-        if self.get_default_pre_2_7_value(section, key) is not None:
+        if self.get_provider_config_fallback_defaults(section, key) is not 
None:
             # no expansion needed
-            return self.get_default_pre_2_7_value(section, key, **kwargs)
+            return self.get_provider_config_fallback_defaults(section, key, 
**kwargs)
 
         if not suppress_warnings:
             log.warning("section/key [%s/%s] not found in config", section, 
key)
@@ -1317,7 +1428,7 @@ class AirflowConfigParser(ConfigParser):
         """
         super().read_dict(dictionary=dictionary, source=source)
 
-    def has_option(self, section: str, option: str) -> bool:
+    def has_option(self, section: str, option: str, lookup_from_deprecated: 
bool = True) -> bool:
         """
         Check if option is defined.
 
@@ -1326,10 +1437,18 @@ class AirflowConfigParser(ConfigParser):
 
         :param section: section to get option from
         :param option: option to get
+        :param lookup_from_deprecated: If True, check if the option is defined 
in deprecated sections
         :return:
         """
         try:
-            value = self.get(section, option, fallback=None, 
_extra_stacklevel=1, suppress_warnings=True)
+            value = self.get(
+                section,
+                option,
+                fallback=None,
+                _extra_stacklevel=1,
+                suppress_warnings=True,
+                lookup_from_deprecated=lookup_from_deprecated,
+            )
             if value is None:
                 return False
             return True
@@ -1459,7 +1578,7 @@ class AirflowConfigParser(ConfigParser):
 
         # We check sequentially all those sources and the last one we saw it 
in will "win"
         configs: Iterable[tuple[str, ConfigParser]] = [
-            ("default-pre-2-7", self._pre_2_7_default_values),
+            ("provider-fallback-defaults", 
self._provider_config_fallback_default_values),
             ("default", self._default_values),
             ("airflow.cfg", self),
         ]
@@ -1985,17 +2104,27 @@ def 
create_default_config_parser(configuration_description: dict[str, dict[str,
     return parser
 
 
-def create_pre_2_7_defaults() -> ConfigParser:
+def create_provider_config_fallback_defaults() -> ConfigParser:
     """
-    Create parser using the old defaults from Airflow < 2.7.0.
+    Create fallback defaults.
+
+    This parser contains provider defaults for Airflow configuration, 
containing fallback default values
+    that might be needed when provider classes are being imported - before 
provider's configuration
+    is loaded.
+
+    Unfortunately airflow currently performs a lot of stuff during importing 
and some of that might lead
+    to retrieving provider configuration before the defaults for the provider 
are loaded.
+
+    Those are only defaults, so if you have "real" values configured in your 
configuration (.cfg file or
+    environment variables) those will be used as usual.
+
+    NOTE!! Do NOT attempt to remove those default fallbacks thinking that they 
are unnecessary duplication,
+    at least not until we fix the way how airflow imports "do stuff". This is 
unlikely to succeed.
 
-    This is used in order to be able to fall-back to those defaults when old 
version of provider,
-    not supporting "config contribution" is installed with Airflow 2.7.0+. 
This "default"
-    configuration does not support variable expansion, those are pretty much 
hard-coded defaults '
-    we want to fall-back to in such case.
+    You've been warned!
     """
     config_parser = ConfigParser()
-    config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
+    
config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
     return config_parser
 
 
diff --git a/tests/cli/commands/test_config_command.py 
b/tests/cli/commands/test_config_command.py
index 5e87b90e31f..8330127c79d 100644
--- a/tests/cli/commands/test_config_command.py
+++ b/tests/cli/commands/test_config_command.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import contextlib
 import os
 import re
+import shutil
 from io import StringIO
 from unittest import mock
 
@@ -27,6 +28,7 @@ import pytest
 from airflow.cli import cli_parser
 from airflow.cli.commands import config_command
 from airflow.cli.commands.config_command import ConfigChange, ConfigParameter
+from airflow.configuration import conf
 from tests.test_utils.config import conf_vars
 
 STATSD_CONFIG_BEGIN_WITH = "# `StatsD <https://github.com/statsd/statsd>`"
@@ -233,7 +235,10 @@ class TestCliConfigGetValue:
 
 
 class TestConfigLint:
-    @pytest.mark.parametrize("removed_config", config_command.CONFIGS_CHANGES)
+    @pytest.mark.parametrize(
+        "removed_config",
+        [config for config in config_command.CONFIGS_CHANGES if 
config.was_removed and config.message],
+    )
     def test_lint_detects_removed_configs(self, removed_config):
         with mock.patch("airflow.configuration.conf.has_option", 
return_value=True):
             with contextlib.redirect_stdout(StringIO()) as temp_stdout:
@@ -246,6 +251,23 @@ class TestConfigLint:
 
         assert normalized_message in normalized_output
 
+    @pytest.mark.parametrize(
+        "default_changed_config",
+        [config for config in config_command.CONFIGS_CHANGES if 
config.default_change],
+    )
+    def test_lint_detects_default_changed_configs(self, 
default_changed_config):
+        with mock.patch("airflow.configuration.conf.has_option", 
return_value=True):
+            with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+                
config_command.lint_config(cli_parser.get_parser().parse_args(["config", 
"lint"]))
+
+            output = temp_stdout.getvalue()
+
+        if default_changed_config.message is not None:
+            normalized_output = re.sub(r"\s+", " ", output.strip())
+            normalized_message = re.sub(r"\s+", " ", 
default_changed_config.message.strip())
+
+            assert normalized_message in normalized_output
+
     @pytest.mark.parametrize(
         "section, option, suggestion",
         [
@@ -316,7 +338,8 @@ class TestConfigLint:
     def test_lint_detects_multiple_issues(self):
         with mock.patch(
             "airflow.configuration.conf.has_option",
-            side_effect=lambda s, o: o in ["check_slas", 
"strict_dataset_uri_validation"],
+            side_effect=lambda section, option, lookup_from_deprecated: option
+            in ["check_slas", "strict_dataset_uri_validation"],
         ):
             with contextlib.redirect_stdout(StringIO()) as temp_stdout:
                 
config_command.lint_config(cli_parser.get_parser().parse_args(["config", 
"lint"]))
@@ -417,7 +440,7 @@ class TestConfigLint:
                 "Removed deprecated `check_slas` configuration parameter from 
`core` section.",
             ),
             (
-                "AIRFLOW__CORE__STRICT_ASSET_URI_VALIDATION",
+                "AIRFLOW__CORE__strict_dataset_uri_validation",
                 ConfigChange(
                     config=ConfigParameter("core", 
"strict_dataset_uri_validation"),
                     suggestion="Dataset URI with a defined scheme will now 
always be validated strictly, raising a hard error on validation failure.",
@@ -438,3 +461,122 @@ class TestConfigLint:
 
         assert expected_message in normalized_output
         assert config_change.suggestion in normalized_output
+
+    def test_lint_detects_invalid_config(self):
+        with mock.patch.dict(os.environ, {"AIRFLOW__CORE__PARALLELISM": "0"}):
+            with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+                
config_command.lint_config(cli_parser.get_parser().parse_args(["config", 
"lint"]))
+
+            output = temp_stdout.getvalue()
+
+        normalized_output = re.sub(r"\s+", " ", output.strip())
+
+        assert (
+            "Invalid value `0` set for `parallelism` configuration parameter 
in `core` section."
+            in normalized_output
+        )
+
+    def test_lint_detects_invalid_config_negative(self):
+        with mock.patch.dict(os.environ, {"AIRFLOW__CORE__PARALLELISM": "42"}):
+            with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+                
config_command.lint_config(cli_parser.get_parser().parse_args(["config", 
"lint"]))
+
+            output = temp_stdout.getvalue()
+
+        normalized_output = re.sub(r"\s+", " ", output.strip())
+
+        assert "Invalid value" not in normalized_output
+
+
+class TestCliConfigUpdate:
+    @classmethod
+    def setup_class(cls):
+        cls.parser = cli_parser.get_parser()
+
+    @pytest.fixture(autouse=True)
+    def setup_fake_airflow_cfg(self, tmp_path, monkeypatch):
+        fake_config = tmp_path / "airflow.cfg"
+        fake_config.write_text(
+            """
+            [test_admin]
+            rename_key = legacy_value
+            remove_key = to_be_removed
+            [test_core]
+            dags_folder = /some/path/to/dags
+            default_key = OldDefault"""
+        )
+        monkeypatch.setenv("AIRFLOW_CONFIG", str(fake_config))
+        monkeypatch.setattr(config_command, "AIRFLOW_CONFIG", str(fake_config))
+        conf.read(str(fake_config))
+        return fake_config
+
+    def test_update_renamed_option(self, monkeypatch, setup_fake_airflow_cfg):
+        fake_config = setup_fake_airflow_cfg
+        renamed_change = ConfigChange(
+            config=ConfigParameter("test_admin", "rename_key"),
+            renamed_to=ConfigParameter("test_core", "renamed_key"),
+        )
+        monkeypatch.setattr(config_command, "CONFIGS_CHANGES", 
[renamed_change])
+        assert conf.has_option("test_admin", "rename_key")
+        args = self.parser.parse_args(["config", "update", "--fix", 
"--all-recommendations"])
+        config_command.update_config(args)
+        content = fake_config.read_text()
+        admin_section = content.split("[test_admin]")[-1]
+        assert "rename_key" not in admin_section
+        core_section = content.split("[test_core]")[-1]
+        assert "renamed_key" in core_section
+        assert "# Renamed from test_admin.rename_key" in content
+
+    def test_update_removed_option(self, monkeypatch, setup_fake_airflow_cfg):
+        fake_config = setup_fake_airflow_cfg
+        removed_change = ConfigChange(
+            config=ConfigParameter("test_admin", "remove_key"),
+            suggestion="Option removed in Airflow 3.0.",
+        )
+        monkeypatch.setattr(config_command, "CONFIGS_CHANGES", 
[removed_change])
+        assert conf.has_option("test_admin", "remove_key")
+        args = self.parser.parse_args(["config", "update", "--fix", 
"--all-recommendations"])
+        config_command.update_config(args)
+        content = fake_config.read_text()
+        assert "remove_key" not in content
+
+    def test_update_no_changes(self, monkeypatch, capsys):
+        monkeypatch.setattr(config_command, "CONFIGS_CHANGES", [])
+        args = self.parser.parse_args(["config", "update"])
+        config_command.update_config(args)
+        captured = capsys.readouterr().out
+        assert "No updates needed" in captured
+
+    def test_update_backup_creation(self, monkeypatch):
+        removed_change = ConfigChange(
+            config=ConfigParameter("test_admin", "remove_key"),
+            suggestion="Option removed.",
+        )
+        monkeypatch.setattr(config_command, "CONFIGS_CHANGES", 
[removed_change])
+        assert conf.has_option("test_admin", "remove_key")
+        args = self.parser.parse_args(["config", "update"])
+        mock_copy = mock.MagicMock()
+        monkeypatch.setattr(shutil, "copy2", mock_copy)
+        config_command.update_config(args)
+        backup_path = os.environ.get("AIRFLOW_CONFIG") + ".bak"
+        mock_copy.assert_called_once_with(os.environ.get("AIRFLOW_CONFIG"), 
backup_path)
+
+    def test_update_only_breaking_changes_with_fix(self, monkeypatch, 
setup_fake_airflow_cfg):
+        fake_config = setup_fake_airflow_cfg
+        breaking_change = ConfigChange(
+            config=ConfigParameter("test_admin", "rename_key"),
+            renamed_to=ConfigParameter("test_admin", "new_breaking_key"),
+            breaking=True,
+        )
+        non_breaking_change = ConfigChange(
+            config=ConfigParameter("test_admin", "remove_key"),
+            suggestion="Option removed.",
+            breaking=False,
+        )
+        monkeypatch.setattr(config_command, "CONFIGS_CHANGES", 
[breaking_change, non_breaking_change])
+        args = self.parser.parse_args(["config", "update", "--fix"])
+        config_command.update_config(args)
+        content = fake_config.read_text()
+        assert "rename_key = legacy_value" not in content
+        assert "new_breaking_key" in content
+        assert "remove_key" in content
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index d48c4665310..cceef50a313 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -954,6 +954,48 @@ class TestDeprecatedConf:
             with pytest.warns(DeprecationWarning), conf_vars({("celery", 
"celeryd_concurrency"): "99"}):
                 assert conf.getint("celery", "worker_concurrency") == 99
 
+    @pytest.mark.parametrize(
+        "deprecated_options_dict, kwargs, new_section_expected_value, 
old_section_expected_value",
+        [
+            pytest.param(
+                {("old_section", "old_key"): ("new_section", "new_key", 
"2.0.0")},
+                {"fallback": None},
+                None,
+                "value",
+                id="deprecated_in_different_section_lookup_enabled",
+            ),
+            pytest.param(
+                {("old_section", "old_key"): ("new_section", "new_key", 
"2.0.0")},
+                {"fallback": None, "lookup_from_deprecated": False},
+                None,
+                None,
+                id="deprecated_in_different_section_lookup_disabled",
+            ),
+            pytest.param(
+                {("new_section", "old_key"): ("new_section", "new_key", 
"2.0.0")},
+                {"fallback": None},
+                "value",
+                None,
+                id="deprecated_in_same_section_lookup_enabled",
+            ),
+            pytest.param(
+                {("new_section", "old_key"): ("new_section", "new_key", 
"2.0.0")},
+                {"fallback": None, "lookup_from_deprecated": False},
+                None,
+                None,
+                id="deprecated_in_same_section_lookup_disabled",
+            ),
+        ],
+    )
+    def test_deprecated_options_with_lookup_from_deprecated(
+        self, deprecated_options_dict, kwargs, new_section_expected_value, 
old_section_expected_value
+    ):
+        with conf_vars({("new_section", "new_key"): "value"}):
+            with 
set_deprecated_options(deprecated_options=deprecated_options_dict):
+                assert conf.get("new_section", "old_key", **kwargs) == 
new_section_expected_value
+
+                assert conf.get("old_section", "old_key", **kwargs) == 
old_section_expected_value
+
     @conf_vars(
         {
             ("logging", "logging_level"): None,

Reply via email to