This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 93c5caa2fe8 Add `airflow db-manager` CLI for managing external databases (#50657) 93c5caa2fe8 is described below commit 93c5caa2fe808bbd89738ed8c9fc52a257712e3f Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon Jun 2 13:53:40 2025 +0100 Add `airflow db-manager` CLI for managing external databases (#50657) * Add `airflow db-manager` CLI for managing external databases This adds a new top-level CLI command, `airflow db-manager`, which provides an interface for managing metadata databases handled by external DB managers. Previously, external DB managers were required to define and expose their own CLI commands. With this change, Airflow now provides support for common DB actions (reset, migrate, downgrade) across external database backends by delegating to their registered managers. Changes: - Introduces db-manager CLI group with reset, migrate, and downgrade commands. - Each command accepts a --name flag to select the DB manager. - Adds test coverage for the CLI commands. This enables external providers to plug in seamlessly without duplicating db command logic. * fixup! Add `airflow db-manager` CLI for managing external databases * apply suggestions from code review * fixup! apply suggestions from code review * Add back renamed file * Apply suggestions from code review Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> --------- Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> --- airflow-core/src/airflow/cli/cli_config.py | 64 +++++++++++++++++ .../src/airflow/cli/commands/db_manager_command.py | 61 ++++++++++++++++ airflow-core/src/airflow/utils/db_manager.py | 8 ++- .../unit/cli/commands/test_db_manager_command.py | 81 ++++++++++++++++++++++ .../providers/fab/auth_manager/models/db.py | 1 + 5 files changed, 212 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 97c72fdbbdf..a6b4fe40d90 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -594,6 +594,12 @@ ARG_DB_SKIP_INIT = Arg( default=False, ) +ARG_DB_MANAGER_PATH = Arg( + ("import_path",), + help="The import path of the database manager to use. ", + default=None, +) + # api-server ARG_API_SERVER_PORT = Arg( ("-p", "--port"), @@ -1725,6 +1731,59 @@ JOBS_COMMANDS = ( ), ) +DB_MANAGERS_COMMANDS = ( + ActionCommand( + name="reset", + help="Burn down and rebuild the specified external database", + func=lazy_load_command("airflow.cli.commands.db_manager_command.resetdb"), + args=(ARG_DB_MANAGER_PATH, ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE), + ), + ActionCommand( + name="migrate", + help="Migrates the specified external database to the latest version", + description=( + "Migrate the schema of the metadata database. " + "Create the database if it does not exist " + "To print but not execute commands, use option ``--show-sql-only``. " + "If using options ``--from-revision`` or ``--from-version``, you must also use " + "``--show-sql-only``, because if actually *running* migrations, we should only " + "migrate from the *current* Alembic revision." + ), + func=lazy_load_command("airflow.cli.commands.db_manager_command.migratedb"), + args=( + ARG_DB_MANAGER_PATH, + ARG_DB_REVISION__UPGRADE, + ARG_DB_VERSION__UPGRADE, + ARG_DB_SQL_ONLY, + ARG_DB_FROM_REVISION, + ARG_DB_FROM_VERSION, + ARG_VERBOSE, + ), + ), + ActionCommand( + name="downgrade", + help="Downgrade the schema of the external metadata database.", + description=( + "Downgrade the schema of the metadata database. " + "You must provide either `--to-revision` or `--to-version`. " + "To print but not execute commands, use option `--show-sql-only`. " + "If using options `--from-revision` or `--from-version`, you must also use `--show-sql-only`, " + "because if actually *running* migrations, we should only migrate from the *current* Alembic " + "revision." + ), + func=lazy_load_command("airflow.cli.commands.db_manager_command.downgrade"), + args=( + ARG_DB_MANAGER_PATH, + ARG_DB_REVISION__DOWNGRADE, + ARG_DB_VERSION__DOWNGRADE, + ARG_DB_SQL_ONLY, + ARG_YES, + ARG_DB_FROM_REVISION, + ARG_DB_FROM_VERSION, + ARG_VERBOSE, + ), + ), +) core_commands: list[CLICommand] = [ GroupCommand( name="dags", @@ -1914,6 +1973,11 @@ core_commands: list[CLICommand] = [ func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"), args=(), ), + GroupCommand( + name="db-manager", + help="Manage externally connected database managers", + subcommands=DB_MANAGERS_COMMANDS, + ), ] diff --git a/airflow-core/src/airflow/cli/commands/db_manager_command.py b/airflow-core/src/airflow/cli/commands/db_manager_command.py new file mode 100644 index 00000000000..7961ea12d02 --- /dev/null +++ b/airflow-core/src/airflow/cli/commands/db_manager_command.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow import settings +from airflow.cli.commands.db_command import run_db_downgrade_command, run_db_migrate_command +from airflow.configuration import conf +from airflow.utils import cli as cli_utils +from airflow.utils.module_loading import import_string +from airflow.utils.providers_configuration_loader import providers_configuration_loaded + + +def _get_db_manager(classpath: str): + """Import the db manager class.""" + managers = conf.getlist("database", "external_db_managers") + if classpath not in managers: + raise SystemExit(f"DB manager {classpath} not found in configuration.") + return import_string(classpath.strip()) + + +@providers_configuration_loaded +def resetdb(args): + """Reset the metadata database.""" + db_manager = _get_db_manager(args.import_path) + if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"): + raise SystemExit("Cancelled") + db_manager(settings.Session()).resetdb(skip_init=args.skip_init) + + +@cli_utils.action_cli(check_db=False) +@providers_configuration_loaded +def migratedb(args): + """Migrates the metadata database.""" + db_manager = _get_db_manager(args.import_path) + session = settings.Session() + upgrade_command = db_manager(session).upgradedb + run_db_migrate_command(args, upgrade_command, revision_heads_map=db_manager.revision_heads_map) + + +@cli_utils.action_cli(check_db=False) +@providers_configuration_loaded +def downgrade(args): + """Downgrades the metadata database.""" + db_manager = _get_db_manager(args.import_path) + session = settings.Session() + downgrade_command = db_manager(session).downgrade + run_db_downgrade_command(args, downgrade_command, revision_heads_map=db_manager.revision_heads_map) diff --git a/airflow-core/src/airflow/utils/db_manager.py b/airflow-core/src/airflow/utils/db_manager.py index 6483c446f01..c746cdaca08 100644 --- a/airflow-core/src/airflow/utils/db_manager.py +++ b/airflow-core/src/airflow/utils/db_manager.py @@ -43,6 +43,7 @@ class BaseDBManager(LoggingMixin): version_table_name: str # Whether the database supports dropping tables when airflow tables are dropped supports_table_dropping: bool = False + revision_heads_map: dict[str, str] = {} def __init__(self, session): super().__init__() @@ -97,6 +98,8 @@ class BaseDBManager(LoggingMixin): self.log.info("%s tables have been created from the ORM", self.__class__.__name__) def drop_tables(self, connection): + if not self.supports_table_dropping: + return self.metadata.drop_all(connection) version = self._get_migration_ctx()._version if inspect(connection).has_table(version.name): @@ -222,6 +225,5 @@ class RunDBManager(LoggingMixin): def drop_tables(self, session, connection): """Drop the external database managers.""" for manager in self._managers: - if manager.supports_table_dropping: - m = manager(session) - m.drop_tables(connection) + m = manager(session) + m.drop_tables(connection) diff --git a/airflow-core/tests/unit/cli/commands/test_db_manager_command.py b/airflow-core/tests/unit/cli/commands/test_db_manager_command.py new file mode 100644 index 00000000000..a5e2ae58d10 --- /dev/null +++ b/airflow-core/tests/unit/cli/commands/test_db_manager_command.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.cli import cli_parser +from airflow.cli.commands import db_manager_command + +from tests_common.test_utils.config import conf_vars + +pytestmark = pytest.mark.db_test + + +class TestCliDbManager: + @classmethod + def setup_class(cls): + cls.parser = cli_parser.get_parser() + + @mock.patch("airflow.cli.commands.db_manager_command._get_db_manager") + def test_cli_resetdb(self, mock_get_db_manager): + manager_name = "path.to.TestDBManager" + db_manager_command.resetdb(self.parser.parse_args(["db-manager", "reset", manager_name, "--yes"])) + mock_get_db_manager.assert_called_once_with("path.to.TestDBManager") + mock_get_db_manager.return_value.resetdb.asset_called_once() + + @mock.patch("airflow.cli.commands.db_manager_command._get_db_manager") + def test_cli_resetdb_skip_init(self, mock_get_db_manager): + manager_name = "path.to.TestDBManager" + db_manager_command.resetdb( + self.parser.parse_args(["db-manager", "reset", manager_name, "--yes", "--skip-init"]) + ) + mock_get_db_manager.assert_called_once_with(manager_name) + mock_get_db_manager.return_value.resetdb.asset_called_once_with(skip_init=True) + + @mock.patch("airflow.cli.commands.db_manager_command._get_db_manager") + @mock.patch("airflow.cli.commands.db_manager_command.run_db_migrate_command") + def test_cli_migrate_db(self, mock_run_db_migrate_cmd, mock_get_db_manager): + manager_name = "path.to.TestDBManager" + db_manager_command.migratedb(self.parser.parse_args(["db-manager", "migrate", manager_name])) + mock_get_db_manager.assert_called_once_with(manager_name) + mock_run_db_migrate_cmd.assert_called_once() + + @mock.patch("airflow.cli.commands.db_manager_command._get_db_manager") + @mock.patch("airflow.cli.commands.db_manager_command.run_db_downgrade_command") + def test_cli_downgrade_db(self, mock_run_db_downgrade_cmd, mock_get_db_manager): + manager_name = "path.to.TestDBManager" + db_manager_command.downgrade(self.parser.parse_args(["db-manager", "downgrade", manager_name])) + mock_get_db_manager.assert_called_once_with(manager_name) + mock_run_db_downgrade_cmd.assert_called_once() + + @conf_vars({("database", "external_db_managers"): "path.to.manager.TestDBManager"}) + @mock.patch("airflow.cli.commands.db_manager_command.import_string") + def test_get_db_manager(self, mock_import_string): + manager_name = "path.to.manager.TestDBManager" + db_manager = db_manager_command._get_db_manager(manager_name) + mock_import_string.assert_called_once_with("path.to.manager.TestDBManager") + assert db_manager is not None + + @conf_vars({("database", "external_db_managers"): "path.to.manager.TestDBManager"}) + @mock.patch("airflow.cli.commands.db_manager_command.import_string") + def test_get_db_manager_raises(self, mock_import_string): + manager_name = "NonExistentDBManager" + with pytest.raises(SystemExit): + db_manager_command._get_db_manager(manager_name) diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py index 8ffb6cfeab8..1a8d66af370 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py @@ -53,6 +53,7 @@ class FABDBManager(BaseDBManager): migration_dir = (PACKAGE_DIR / "migrations").as_posix() alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix() supports_table_dropping = True + revision_heads_map = _REVISION_HEADS_MAP def create_db_from_orm(self): super().create_db_from_orm()