This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a76e0fe16e Add `airflow db drop-archived` command (#29309)
a76e0fe16e is described below
commit a76e0fe16ef12749c3fea1b68d82936b238fafbb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Feb 9 22:26:29 2023 +0100
Add `airflow db drop-archived` command (#29309)
* Add `airflow db drop-archived` command
This command drops the archive tables directly
As part of this, the _confirm_drop_archives function was made more
interactive
* fixup! Add `airflow db drop-archived` command
* Fix test and add doc
* Apply suggestions from code review
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/cli/cli_parser.py | 6 ++
airflow/cli/commands/db_command.py | 11 +++-
airflow/utils/db_cleanup.py | 58 ++++++++++++++----
docs/apache-airflow/howto/usage-cli.rst | 10 ++++
tests/cli/commands/test_db_command.py | 28 +++++++++
tests/utils/test_db_cleanup.py | 100 +++++++++++++++++++++++++++++---
6 files changed, 191 insertions(+), 22 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 9a82d2f06e..bf2e98d0f3 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1626,6 +1626,12 @@ DB_COMMANDS = (
ARG_DB_TABLES,
),
),
+ ActionCommand(
+ name="drop-archived",
+ help="Drop archived tables created through the db clean command",
+
func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"),
+ args=(ARG_DB_TABLES, ARG_YES),
+ ),
)
CONNECTIONS_COMMANDS = (
ActionCommand(
diff --git a/airflow/cli/commands/db_command.py
b/airflow/cli/commands/db_command.py
index 468aa3d87f..72ee55c86d 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -27,7 +27,7 @@ from airflow import settings
from airflow.exceptions import AirflowException
from airflow.utils import cli as cli_utils, db
from airflow.utils.db import REVISION_HEADS_MAP
-from airflow.utils.db_cleanup import config_dict, export_cleaned_records,
run_cleanup
+from airflow.utils.db_cleanup import config_dict, drop_archived_tables,
export_cleaned_records, run_cleanup
from airflow.utils.process_utils import execute_interactive
@@ -218,3 +218,12 @@ def export_cleaned(args):
table_names=args.tables,
drop_archives=args.drop_archives,
)
+
+
+@cli_utils.action_cli(check_db=False)
+def drop_archived(args):
+ """Drops archived tables from metadata database."""
+ drop_archived_tables(
+ table_names=args.tables,
+ needs_confirm=not args.yes,
+ )
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index a9d12c7aa3..41b89931f5 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -39,6 +39,7 @@ from airflow.cli.simple_table import AirflowConsole
from airflow.models import Base
from airflow.utils import timezone
from airflow.utils.db import reflect_tables
+from airflow.utils.helpers import ask_yesno
from airflow.utils.session import NEW_SESSION, provide_session
logger = logging.getLogger(__file__)
@@ -301,13 +302,21 @@ def _confirm_delete(*, date: DateTime, tables: list[str]):
def _confirm_drop_archives(*, tables: list[str]):
+ # if length of tables is greater than 3, show the total count
+ if len(tables) > 3:
+ text_ = f"{len(tables)} archived tables prefixed with
{ARCHIVE_TABLE_PREFIX}"
+ else:
+ text_ = f"the following archived tables {tables}"
question = (
- f"You have requested that we drop archived records for tables
{tables!r}.\n"
- f"This is irreversible. Consider backing up the tables first \n"
- f"Enter 'drop archived tables' (without quotes) to proceed."
+ f"You have requested that we drop {text_}.\n"
+ f"This is irreversible. Consider backing up the tables first \n"
)
print(question)
- answer = input().strip()
+ if len(tables) > 3:
+ show_tables = ask_yesno("Show tables? (y/n): ")
+ if show_tables:
+ print(tables, "\n")
+ answer = input("Enter 'drop archived tables' (without quotes) to
proceed.\n").strip()
if not answer == "drop archived tables":
raise SystemExit("User did not confirm; exiting.")
@@ -347,6 +356,19 @@ def _effective_table_names(*, table_names: list[str] |
None):
return effective_table_names, effective_config_dict
+def _get_archived_table_names(table_names, session):
+ inspector = inspect(session.bind)
+ db_table_names = [x for x in inspector.get_table_names() if
x.startswith(ARCHIVE_TABLE_PREFIX)]
+ effective_table_names, _ = _effective_table_names(table_names=table_names)
+ # Filter out tables that don't start with the archive prefix
+ archived_table_names = [
+ table_name
+ for table_name in db_table_names
+ if any("__" + x + "__" in table_name for x in effective_table_names)
+ ]
+ return archived_table_names
+
+
@provide_session
def run_cleanup(
*,
@@ -410,16 +432,14 @@ def export_cleaned_records(
export_format, output_path, table_names=None, drop_archives=False,
session: Session = NEW_SESSION
):
"""Export cleaned data to the given output path in the given format."""
- effective_table_names, _ = _effective_table_names(table_names=table_names)
- if drop_archives:
- _confirm_drop_archives(tables=sorted(effective_table_names))
- inspector = inspect(session.bind)
- db_table_names = [x for x in inspector.get_table_names() if
x.startswith(ARCHIVE_TABLE_PREFIX)]
+ archived_table_names = _get_archived_table_names(table_names, session)
+ # If user chose to drop archives, check there are archive tables that
exists
+ # before asking for confirmation
+ if drop_archives and archived_table_names:
+ _confirm_drop_archives(tables=sorted(archived_table_names))
export_count = 0
dropped_count = 0
- for table_name in db_table_names:
- if not any("__" + x + "__" in table_name for x in
effective_table_names):
- continue
+ for table_name in archived_table_names:
logger.info("Exporting table %s", table_name)
_dump_table_to_file(
target_table=table_name,
@@ -433,3 +453,17 @@ def export_cleaned_records(
session.execute(text(f"DROP TABLE {table_name}"))
dropped_count += 1
logger.info("Total exported tables: %s, Total dropped tables: %s",
export_count, dropped_count)
+
+
+@provide_session
+def drop_archived_tables(table_names, needs_confirm, session):
+ """Drop archived tables."""
+ archived_table_names = _get_archived_table_names(table_names, session)
+ if needs_confirm and archived_table_names:
+ _confirm_drop_archives(tables=sorted(archived_table_names))
+ dropped_count = 0
+ for table_name in archived_table_names:
+ logger.info("Dropping archived table %s", table_name)
+ session.execute(text(f"DROP TABLE {table_name}"))
+ dropped_count += 1
+ logger.info("Total dropped tables: %s", dropped_count)
diff --git a/docs/apache-airflow/howto/usage-cli.rst
b/docs/apache-airflow/howto/usage-cli.rst
index 976b0b938f..cd66d7e0cb 100644
--- a/docs/apache-airflow/howto/usage-cli.rst
+++ b/docs/apache-airflow/howto/usage-cli.rst
@@ -232,6 +232,16 @@ location must exist.
Other options include: ``--tables`` to specify the tables to export,
``--drop-archives`` to drop the archive tables after
exporting.
+Dropping the archived tables
+----------------------------
+
+If during the ``db clean`` process, you did not use the ``--skip-archive``
option which drops the archived table, you can
+still drop the archive tables using the ``db drop-archived`` command. This
operation is irreversible and you are encouraged
+to use the ``db export-cleaned`` command to backup the tables to disk before
dropping them.
+
+You can specify the tables to drop using the ``--tables`` option. If no tables
are specified, all archive tables will be
+dropped.
+
Beware cascading deletes
^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/tests/cli/commands/test_db_command.py
b/tests/cli/commands/test_db_command.py
index 78f873b65c..f91a7767ad 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -507,3 +507,31 @@ class TestCLIDBClean:
export_archived_mock.assert_called_once_with(
export_format="csv", output_path="path", table_names=None,
drop_archives=expected
)
+
+ @pytest.mark.parametrize(
+ "extra_args, expected", [(["--tables", "hello, goodbye"], ["hello",
"goodbye"]), ([], None)]
+ )
+ @patch("airflow.cli.commands.db_command.drop_archived_tables")
+ def test_tables_in_drop_archived_records_command(self,
mock_drop_archived_records, extra_args, expected):
+ args = self.parser.parse_args(
+ [
+ "db",
+ "drop-archived",
+ *extra_args,
+ ]
+ )
+ db_command.drop_archived(args)
+
mock_drop_archived_records.assert_called_once_with(table_names=expected,
needs_confirm=True)
+
+ @pytest.mark.parametrize("extra_args, expected", [(["-y"], False), ([],
True)])
+ @patch("airflow.cli.commands.db_command.drop_archived_tables")
+ def test_confirm_in_drop_archived_records_command(self,
mock_drop_archived_records, extra_args, expected):
+ args = self.parser.parse_args(
+ [
+ "db",
+ "drop-archived",
+ *extra_args,
+ ]
+ )
+ db_command.drop_archived(args)
+ mock_drop_archived_records.assert_called_once_with(table_names=None,
needs_confirm=expected)
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index f72630888a..7d36e37ced 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -42,6 +42,7 @@ from airflow.utils.db_cleanup import (
_confirm_drop_archives,
_dump_table_to_file,
config_dict,
+ drop_archived_tables,
export_cleaned_records,
run_cleanup,
)
@@ -336,27 +337,53 @@ class TestDBCleanup:
"drop_archive",
[True, False],
)
+ @patch("airflow.utils.db_cleanup._dump_table_to_file")
@patch("airflow.utils.db_cleanup._confirm_drop_archives")
- def test_confirm_drop_called_when_drop_archives_is_true(self,
confirm_drop_mock, drop_archive):
+ @patch("airflow.utils.db_cleanup.inspect")
+ def test_confirm_drop_called_when_drop_archives_is_true_and_archive_exists(
+ self, inspect_mock, confirm_drop_mock, _dump_table_to_file_mock,
drop_archive
+ ):
"""test that drop confirmation input is called when appropriate"""
- export_cleaned_records(export_format="csv", output_path="path",
drop_archives=drop_archive)
+ inspector = inspect_mock.return_value
+ inspector.get_table_names.return_value =
[f"{ARCHIVE_TABLE_PREFIX}dag_run__233"]
+ export_cleaned_records(
+ export_format="csv", output_path="path",
drop_archives=drop_archive, session=MagicMock()
+ )
if drop_archive:
confirm_drop_mock.assert_called()
else:
confirm_drop_mock.assert_not_called()
- def test_confirm_drop_archives(self):
- tables = ["table1", "table2"]
+ @pytest.mark.parametrize(
+ "tables",
+ [
+ ["table1", "table2"],
+ ["table1", "table2", "table3"],
+ ["table1", "table2", "table3", "table4"],
+ ],
+ )
+ @patch("airflow.utils.db_cleanup.ask_yesno")
+ def test_confirm_drop_archives(self, mock_ask_yesno, tables):
+ expected = (
+ f"You have requested that we drop the following archived tables
{tables}.\n"
+ "This is irreversible. Consider backing up the tables first"
+ )
+ if len(tables) > 3:
+ expected = (
+ f"You have requested that we drop {len(tables)} archived
tables prefixed with "
+ f"_airflow_deleted__.\n"
+ "This is irreversible. Consider backing up the tables first \n"
+ "\n"
+ f"{tables}"
+ )
+
+ mock_ask_yesno.return_value = True
with patch("sys.stdout", new=StringIO()) as fake_out, patch(
"builtins.input", side_effect=["drop archived tables"]
):
_confirm_drop_archives(tables=tables)
output = fake_out.getvalue().strip()
- expected = (
- f"You have requested that we drop archived records for tables
{tables}.\n"
- "This is irreversible. Consider backing up the tables first
\n"
- "Enter 'drop archived tables' (without quotes) to proceed."
- )
+
assert output == expected
def test_user_did_not_confirm(self):
@@ -394,6 +421,26 @@ class TestDBCleanup:
else:
assert "Total exported tables: 1, Total dropped tables: 0" in
caplog.text
+ @pytest.mark.parametrize("drop_archive", [True, False])
+ @patch("airflow.utils.db_cleanup._dump_table_to_file")
+ @patch("airflow.utils.db_cleanup.inspect")
+ @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+ @patch("builtins.input", side_effect=["drop archived tables"])
+ def test_export_cleaned_no_confirm_if_no_tables(
+ self, mock_input, mock_confirm, inspect_mock, dump_mock, caplog,
drop_archive
+ ):
+ """Test no confirmation if no archived tables found"""
+ session_mock = MagicMock()
+ inspector = inspect_mock.return_value
+ # No tables with the archive prefix
+ inspector.get_table_names.return_value = ["dag_run", "task_instance"]
+ export_cleaned_records(
+ export_format="csv", output_path="path",
drop_archives=drop_archive, session=session_mock
+ )
+ mock_confirm.assert_not_called()
+ dump_mock.assert_not_called()
+ assert "Total exported tables: 0, Total dropped tables: 0" in
caplog.text
+
@patch("airflow.utils.db_cleanup.csv")
def test_dump_table_to_file_function_for_csv(self, mock_csv):
mockopen = mock_open()
@@ -417,6 +464,41 @@ class TestDBCleanup:
)
assert "Export format json is not supported" in str(exc_info.value)
+ @pytest.mark.parametrize("tables", [["log", "dag"], ["dag_run",
"task_instance"]])
+ @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+ @patch("airflow.utils.db_cleanup.inspect")
+ def test_drop_archived_tables_no_confirm_if_no_archived_tables(
+ self, inspect_mock, mock_confirm, tables, caplog
+ ):
+ """
+ Test no confirmation if no archived tables found.
+ Archived tables starts with a prefix defined in ARCHIVE_TABLE_PREFIX.
+ """
+ inspector = inspect_mock.return_value
+ inspector.get_table_names.return_value = tables
+ drop_archived_tables(tables, needs_confirm=True, session=MagicMock())
+ mock_confirm.assert_not_called()
+ assert "Total dropped tables: 0" in caplog.text
+
+ @pytest.mark.parametrize("confirm", [True, False])
+ @patch("airflow.utils.db_cleanup.inspect")
+ @patch("airflow.utils.db_cleanup._confirm_drop_archives")
+ @patch("builtins.input", side_effect=["drop archived tables"])
+ def test_drop_archived_tables(self, mock_input, confirm_mock,
inspect_mock, caplog, confirm):
+ """Test drop_archived_tables"""
+ archived_table = f"{ARCHIVE_TABLE_PREFIX}dag_run__233"
+ normal_table = "dag_run"
+ inspector = inspect_mock.return_value
+ inspector.get_table_names.return_value = [archived_table, normal_table]
+ drop_archived_tables([normal_table], needs_confirm=confirm,
session=MagicMock())
+ assert f"Dropping archived table {archived_table}" in caplog.text
+ assert f"Dropping archived table {normal_table}" not in caplog.text
+ assert "Total dropped tables: 1" in caplog.text
+ if confirm:
+ confirm_mock.assert_called()
+ else:
+ confirm_mock.assert_not_called()
+
def create_tis(base_date, num_tis, external_trigger=False):
with create_session() as session: