This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aec5f04e104 Add a CLI command `airflow config update` to update the
config changes from airflow 2.x to 3. (#48403)
aec5f04e104 is described below
commit aec5f04e10443e2d1f43f6ce9c52d74dc67c55fd
Author: Ankit Chaurasia <[email protected]>
AuthorDate: Thu Apr 3 13:16:07 2025 +0545
Add a CLI command `airflow config update` to update the config changes from
airflow 2.x to 3. (#48403)
---
airflow-core/src/airflow/cli/cli_config.py | 35 +++++
.../src/airflow/cli/commands/config_command.py | 146 ++++++++++++++++++++-
airflow-core/src/airflow/configuration.py | 106 +++++++++++++++
.../tests/unit/cli/commands/test_config_command.py | 76 +++++++++++
4 files changed, 362 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index 3d7a3de5293..705d653e079 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -791,6 +791,28 @@ ARG_OPTIONAL_SECTION = Arg(
help="The section name",
)
+# config update
+ARG_UPDATE_CONFIG_SECTION = Arg(
+ ("--section",),
+ help="The section name(s) to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_OPTION = Arg(
+ ("--option",),
+ help="The option name(s) to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_SECTION = Arg(
+ ("--ignore-section",),
+ help="The section name(s) to ignore to update in the airflow config.",
+ type=string_list_type,
+)
+ARG_UPDATE_CONFIG_IGNORE_OPTION = Arg(
+ ("--ignore-option",),
+ help="The option name(s) to ignore to update in the airflow config.",
+ type=string_list_type,
+)
+
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
("--job-type",),
@@ -1626,6 +1648,19 @@ CONFIG_COMMANDS = (
ARG_VERBOSE,
),
),
+ ActionCommand(
+ name="update",
+ help="update options for the configuration changes while migrating
from Airflow 2.x to Airflow 3.0",
+
func=lazy_load_command("airflow.cli.commands.config_command.update_config"),
+ args=(
+ ARG_UPDATE_CONFIG_SECTION,
+ ARG_UPDATE_CONFIG_OPTION,
+ ARG_UPDATE_CONFIG_IGNORE_SECTION,
+ ARG_UPDATE_CONFIG_IGNORE_OPTION,
+ ARG_DRY_RUN,
+ ARG_VERBOSE,
+ ),
+ ),
)
JOBS_COMMANDS = (
diff --git a/airflow-core/src/airflow/cli/commands/config_command.py
b/airflow-core/src/airflow/cli/commands/config_command.py
index 6ba7da0c877..34ba6aaa0e1 100644
--- a/airflow-core/src/airflow/cli/commands/config_command.py
+++ b/airflow-core/src/airflow/cli/commands/config_command.py
@@ -18,6 +18,7 @@
from __future__ import annotations
+import shutil
from dataclasses import dataclass
from io import StringIO
from typing import Any, NamedTuple
@@ -26,7 +27,7 @@ import pygments
from pygments.lexers.configs import IniLexer
from airflow.cli.simple_table import AirflowConsole
-from airflow.configuration import conf
+from airflow.configuration import AIRFLOW_CONFIG, ConfigModifications, conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.cli import should_use_colors
from airflow.utils.code_utils import get_terminal_formatter
@@ -719,3 +720,146 @@ def lint_config(args) -> None:
console.print("\n[red]Please update your configuration file
accordingly.[/red]")
else:
console.print("[green]No issues found in your airflow.cfg. It is ready
for Airflow 3![/green]")
+
+
+@providers_configuration_loaded
+def update_config(args) -> None:
+ """
+ Update the airflow.cfg file to migrate configuration changes from Airflow
2.x to Airflow 3.
+
+ This command scans the current configuration file for parameters that have
been renamed,
+ removed, or had their default values changed in Airflow 3.0, and
automatically updates
+ the configuration file. This command cleans up the existing comments in
airflow.cfg
+
+ CLI Arguments:
+ --dry-run: flag (optional)
+ Dry-run mode (print the changes without modifying airflow.cfg)
+ Example: --dry-run
+
+ --section: str (optional)
+ Comma-separated list of configuration sections to update.
+ Example: --section core,database
+
+ --option: str (optional)
+ Comma-separated list of configuration options to update.
+ Example: --option sql_alchemy_conn,dag_concurrency
+
+ --ignore-section: str (optional)
+ Comma-separated list of configuration sections to ignore during
update.
+ Example: --ignore-section webserver
+
+ --ignore-option: str (optional)
+ Comma-separated list of configuration options to ignore during
update.
+ Example: --ignore-option check_slas
+
+ Examples:
+ 1. Dry-run mode (print the changes in modified airflow.cfg):
+ airflow config update --dry-run
+
+ 2. Update the entire configuration file:
+ airflow config update
+
+ 3. Update only the 'core' and 'database' sections:
+ airflow config update --section core,database
+
+ 4. Update only specific options:
+ airflow config update --option sql_alchemy_conn,dag_concurrency
+
+ 5. Ignore updates for a specific section:
+ airflow config update --ignore-section webserver
+
+ :param args: The CLI arguments for updating configuration.
+ """
+ console = AirflowConsole()
+ changes_applied: list[str] = []
+ modifications = ConfigModifications()
+
+ update_sections = args.section if args.section else None
+ update_options = args.option if args.option else None
+ ignore_sections = args.ignore_section if args.ignore_section else []
+ ignore_options = args.ignore_option if args.ignore_option else []
+
+ config_dict = conf.as_dict(
+ display_source=True,
+ include_env=True,
+ include_cmds=True,
+ include_secret=True,
+ )
+ for change in CONFIGS_CHANGES:
+ conf_section = change.config.section.lower()
+ conf_option = change.config.option.lower()
+ full_key = f"{conf_section}.{conf_option}"
+
+ if update_sections is not None and conf_section not in [s.lower() for
s in update_sections]:
+ continue
+ if update_options is not None and full_key not in [opt.lower() for opt
in update_options]:
+ continue
+ if conf_section in [s.lower() for s in ignore_sections] or full_key in
[
+ opt.lower() for opt in ignore_options
+ ]:
+ continue
+
+ if conf_section not in config_dict or conf_option not in
config_dict[conf_section]:
+ continue
+ value_data = config_dict[conf_section][conf_option]
+ if not (isinstance(value_data, tuple) and value_data[1] ==
"airflow.cfg"):
+ continue
+
+ current_value = value_data[0]
+
+ if change.default_change:
+ if str(current_value) != str(change.new_default):
+ modifications.add_default_update(conf_section, conf_option,
str(change.new_default))
+ changes_applied.append(
+ f"Updated default value of '{conf_section}/{conf_option}'
from "
+ f"'{current_value}' to '{change.new_default}'."
+ )
+ if change.renamed_to:
+ modifications.add_rename(
+ conf_section, conf_option, change.renamed_to.section,
change.renamed_to.option
+ )
+ changes_applied.append(
+ f"Renamed '{conf_section}/{conf_option}' to "
+
f"'{change.renamed_to.section.lower()}/{change.renamed_to.option.lower()}'."
+ )
+ elif change.was_removed:
+ modifications.add_remove(conf_section, conf_option)
+ changes_applied.append(f"Removed '{conf_section}/{conf_option}'
from configuration.")
+
+ backup_path = f"{AIRFLOW_CONFIG}.bak"
+ try:
+ shutil.copy2(AIRFLOW_CONFIG, backup_path)
+ console.print(f"Backup saved as '{backup_path}'.")
+ except Exception as e:
+ console.print(f"Failed to create backup: {e}")
+ raise AirflowConfigException("Backup creation failed. Aborting
update_config operation.")
+
+ if args.dry_run:
+ console.print("[blue]Dry-run mode enabled. No changes will be written
to airflow.cfg.[/blue]")
+ with StringIO() as config_output:
+ conf.write_custom_config(
+ file=config_output,
+ comment_out_defaults=True,
+ include_descriptions=True,
+ modifications=modifications,
+ )
+ new_config = config_output.getvalue()
+ console.print(new_config)
+ else:
+ with open(AIRFLOW_CONFIG, "w") as config_file:
+ conf.write_custom_config(
+ file=config_file,
+ comment_out_defaults=True,
+ include_descriptions=True,
+ modifications=modifications,
+ )
+
+ if changes_applied:
+ console.print("[green]The following updates were applied:[/green]")
+ for change_msg in changes_applied:
+ console.print(f" - {change_msg}")
+ else:
+ console.print("[green]No updates needed. Your configuration is already
up-to-date.[/green]")
+
+ if args.verbose:
+ console.print("[blue]Configuration update completed with verbose
output enabled.[/blue]")
diff --git a/airflow-core/src/airflow/configuration.py
b/airflow-core/src/airflow/configuration.py
index e9df545d14f..81512280695 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -73,6 +73,30 @@ ConfigSourcesType = dict[str, ConfigSectionSourcesType]
ENV_VAR_PREFIX = "AIRFLOW__"
+class ConfigModifications:
+ """
+ Holds modifications to be applied when writing out the config.
+
+ :param rename: Mapping from (old_section, old_option) to (new_section,
new_option)
+ :param remove: Set of (section, option) to remove
+ :param default_updates: Mapping from (section, option) to new default value
+ """
+
+ def __init__(self) -> None:
+ self.rename: dict[tuple[str, str], tuple[str, str]] = {}
+ self.remove: set[tuple[str, str]] = set()
+ self.default_updates: dict[tuple[str, str], str] = {}
+
+ def add_rename(self, old_section: str, old_option: str, new_section: str,
new_option: str) -> None:
+ self.rename[(old_section, old_option)] = (new_section, new_option)
+
+ def add_remove(self, section: str, option: str) -> None:
+ self.remove.add((section, option))
+
+ def add_default_update(self, section: str, option: str, new_default: str)
-> None:
+ self.default_updates[(section, option)] = new_default
+
+
def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
if match is None:
@@ -549,6 +573,88 @@ class AirflowConfigParser(ConfigParser):
if needs_separation:
file.write("\n")
+ def write_custom_config(
+ self,
+ file: IO[str],
+ comment_out_defaults: bool = True,
+ include_descriptions: bool = True,
+ extra_spacing: bool = True,
+ modifications: ConfigModifications | None = None,
+ ) -> None:
+ """
+ Write a configuration file using a ConfigModifications object.
+
+ This method includes only options from the current airflow.cfg. For
each option:
+ - If it's marked for removal, omit it.
+ - If renamed, output it under its new name and add a comment
indicating its original location.
+ - If a default update is specified, apply the new default and output
the option as a commented line.
+ - Otherwise, if the current value equals the default and
comment_out_defaults is True, output it as a comment.
+ Options absent from the current airflow.cfg are omitted.
+
+ :param file: File to write the configuration.
+ :param comment_out_defaults: If True, options whose value equals the
default are written as comments.
+ :param include_descriptions: Whether to include section descriptions.
+ :param extra_spacing: Whether to insert an extra blank line after each
option.
+ :param modifications: ConfigModifications instance with rename,
remove, and default updates.
+ """
+ modifications = modifications or ConfigModifications()
+ output: dict[str, list[tuple[str, str, bool, str]]] = {}
+
+ for section in self._sections: # type: ignore[attr-defined] #
accessing _sections from ConfigParser
+ for option, orig_value in self._sections[section].items(): #
type: ignore[attr-defined]
+ key = (section.lower(), option.lower())
+ if key in modifications.remove:
+ continue
+
+ mod_comment = ""
+ if key in modifications.rename:
+ new_sec, new_opt = modifications.rename[key]
+ effective_section = new_sec
+ effective_option = new_opt
+ mod_comment += f"# Renamed from {section}.{option}\n"
+ else:
+ effective_section = section
+ effective_option = option
+
+ value = orig_value
+ if key in modifications.default_updates:
+ mod_comment += (
+ f"# Default updated from {orig_value} to
{modifications.default_updates[key]}\n"
+ )
+ value = modifications.default_updates[key]
+
+ default_value = self.get_default_value(effective_section,
effective_option, fallback="")
+ is_default = str(value) == str(default_value)
+ output.setdefault(effective_section.lower(), []).append(
+ (effective_option, str(value), is_default, mod_comment)
+ )
+
+ for section, options in output.items():
+ section_buffer = StringIO()
+ section_buffer.write(f"[{section}]\n")
+ if include_descriptions:
+ description = self.configuration_description.get(section,
{}).get("description", "")
+ if description:
+ for line in description.splitlines():
+ section_buffer.write(f"# {line}\n")
+ section_buffer.write("\n")
+ for option, value_str, is_default, mod_comment in options:
+ key = (section.lower(), option.lower())
+ if key in modifications.default_updates and
comment_out_defaults:
+ section_buffer.write(f"# {option} = {value_str}\n")
+ else:
+ if mod_comment:
+ section_buffer.write(mod_comment)
+ if is_default and comment_out_defaults:
+ section_buffer.write(f"# {option} = {value_str}\n")
+ else:
+ section_buffer.write(f"{option} = {value_str}\n")
+ if extra_spacing:
+ section_buffer.write("\n")
+ content = section_buffer.getvalue().strip()
+ if content:
+ file.write(f"{content}\n\n")
+
def write( # type: ignore[override]
self,
file: IO[str],
diff --git a/airflow-core/tests/unit/cli/commands/test_config_command.py
b/airflow-core/tests/unit/cli/commands/test_config_command.py
index 92f4a1ab731..0de54c2b259 100644
--- a/airflow-core/tests/unit/cli/commands/test_config_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_config_command.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import contextlib
import os
import re
+import shutil
from io import StringIO
from unittest import mock
@@ -27,6 +28,7 @@ import pytest
from airflow.cli import cli_parser
from airflow.cli.commands import config_command
from airflow.cli.commands.config_command import ConfigChange, ConfigParameter
+from airflow.configuration import conf
from tests_common.test_utils.config import conf_vars
@@ -491,3 +493,77 @@ class TestConfigLint:
normalized_output = re.sub(r"\s+", " ", output.strip())
assert "Invalid value" not in normalized_output
+
+
+class TestCliConfigUpdate:
+ @classmethod
+ def setup_class(cls):
+ cls.parser = cli_parser.get_parser()
+
+ @pytest.fixture(autouse=True)
+ def setup_fake_airflow_cfg(self, tmp_path, monkeypatch):
+ fake_config = tmp_path / "airflow.cfg"
+ fake_config.write_text(
+ """
+ [test_admin]
+ rename_key = legacy_value
+ remove_key = to_be_removed
+ [test_core]
+ dags_folder = /some/path/to/dags
+ default_key = OldDefault"""
+ )
+ monkeypatch.setenv("AIRFLOW_CONFIG", str(fake_config))
+ monkeypatch.setattr(config_command, "AIRFLOW_CONFIG", str(fake_config))
+ conf.read(str(fake_config))
+ return fake_config
+
+ def test_update_renamed_option(self, monkeypatch, setup_fake_airflow_cfg):
+ fake_config = setup_fake_airflow_cfg
+ renamed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "rename_key"),
+ renamed_to=ConfigParameter("test_core", "renamed_key"),
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[renamed_change])
+ assert conf.has_option("test_admin", "rename_key")
+ args = self.parser.parse_args(["config", "update"])
+ config_command.update_config(args)
+ content = fake_config.read_text()
+ admin_section = content.split("[test_admin]")[-1]
+ assert "rename_key" not in admin_section
+ core_section = content.split("[test_core]")[-1]
+ assert "renamed_key" in core_section
+ assert "# Renamed from test_admin.rename_key" in content
+
+ def test_update_removed_option(self, monkeypatch, setup_fake_airflow_cfg):
+ fake_config = setup_fake_airflow_cfg
+ removed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "remove_key"),
+ suggestion="Option removed in Airflow 3.0.",
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[removed_change])
+ assert conf.has_option("test_admin", "remove_key")
+ args = self.parser.parse_args(["config", "update"])
+ config_command.update_config(args)
+ content = fake_config.read_text()
+ assert "remove_key" not in content
+
+ def test_update_no_changes(self, monkeypatch, capsys):
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES", [])
+ args = self.parser.parse_args(["config", "update"])
+ config_command.update_config(args)
+ captured = capsys.readouterr().out
+ assert "No updates needed" in captured
+
+ def test_update_backup_creation(self, monkeypatch):
+ removed_change = ConfigChange(
+ config=ConfigParameter("test_admin", "remove_key"),
+ suggestion="Option removed.",
+ )
+ monkeypatch.setattr(config_command, "CONFIGS_CHANGES",
[removed_change])
+ assert conf.has_option("test_admin", "remove_key")
+ args = self.parser.parse_args(["config", "update"])
+ mock_copy = mock.MagicMock()
+ monkeypatch.setattr(shutil, "copy2", mock_copy)
+ config_command.update_config(args)
+ backup_path = os.environ.get("AIRFLOW_CONFIG") + ".bak"
+ mock_copy.assert_called_once_with(os.environ.get("AIRFLOW_CONFIG"),
backup_path)