This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e87a2a72cc fix: finally delete archive table when airflow clean fail
(#46857)
1e87a2a72cc is described below
commit 1e87a2a72ccd172ba851e62f4fe4c77998b19d73
Author: Huanjie Guo <[email protected]>
AuthorDate: Fri Feb 28 18:00:22 2025 +0800
fix: finally delete archive table when airflow clean fail (#46857)
* finally delete target table when airflow clean fail
* finally delete target table when airflow clean fail
* fix: format change
* fix: format change
* doc: add doc for failure in airflow db clean
* ut: add case for failure in airflow db clean
---------
Co-authored-by: Huanjie Guo <[email protected]>
---
airflow/utils/db_cleanup.py | 79 ++++++++++++++++++---------------
docs/apache-airflow/howto/usage-cli.rst | 2 +
tests/utils/test_db_cleanup.py | 34 +++++++++++++-
3 files changed, 78 insertions(+), 37 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 00af8ff6c8a..0efb9381a37 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -171,43 +171,50 @@ def _do_delete(*, query: Query, orm_model: Base,
skip_archive: bool, session: Se
print(f"Moving data to table {target_table_name}")
bind = session.get_bind()
dialect_name = bind.dialect.name
- if dialect_name == "mysql":
- # MySQL with replication needs this split into two queries, so just do
it for all MySQL
- # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE
TABLE ... SELECT.
- session.execute(text(f"CREATE TABLE {target_table_name} LIKE
{orm_model.name}"))
- metadata = reflect_tables([target_table_name], session)
+ target_table = None
+ try:
+ if dialect_name == "mysql":
+ # MySQL with replication needs this split into two queries, so
just do it for all MySQL
+ # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE
TABLE ... SELECT.
+ session.execute(text(f"CREATE TABLE {target_table_name} LIKE
{orm_model.name}"))
+ metadata = reflect_tables([target_table_name], session)
+ target_table = metadata.tables[target_table_name]
+ insert_stm = target_table.insert().from_select(target_table.c,
query)
+ logger.debug("insert statement:\n%s", insert_stm.compile())
+ session.execute(insert_stm)
+ else:
+ stmt = CreateTableAs(target_table_name, query.selectable)
+ logger.debug("ctas query:\n%s", stmt.compile())
+ session.execute(stmt)
+ session.commit()
+
+ # delete the rows from the old table
+ metadata = reflect_tables([orm_model.name, target_table_name], session)
+ source_table = metadata.tables[orm_model.name]
target_table = metadata.tables[target_table_name]
- insert_stm = target_table.insert().from_select(target_table.c, query)
- logger.debug("insert statement:\n%s", insert_stm.compile())
- session.execute(insert_stm)
- else:
- stmt = CreateTableAs(target_table_name, query.selectable)
- logger.debug("ctas query:\n%s", stmt.compile())
- session.execute(stmt)
- session.commit()
-
- # delete the rows from the old table
- metadata = reflect_tables([orm_model.name, target_table_name], session)
- source_table = metadata.tables[orm_model.name]
- target_table = metadata.tables[target_table_name]
- logger.debug("rows moved; purging from %s", source_table.name)
- if dialect_name == "sqlite":
- pk_cols = source_table.primary_key.columns
- delete = source_table.delete().where(
- tuple_(*pk_cols).in_(select(*[target_table.c[x.name] for x in
source_table.primary_key.columns]))
- )
- else:
- delete = source_table.delete().where(
- and_(col == target_table.c[col.name] for col in
source_table.primary_key.columns)
- )
- logger.debug("delete statement:\n%s", delete.compile())
- session.execute(delete)
- session.commit()
- if skip_archive:
- bind = session.get_bind()
- target_table.drop(bind=bind)
- session.commit()
- print("Finished Performing Delete")
+ logger.debug("rows moved; purging from %s", source_table.name)
+ if dialect_name == "sqlite":
+ pk_cols = source_table.primary_key.columns
+ delete = source_table.delete().where(
+ tuple_(*pk_cols).in_(
+ select(*[target_table.c[x.name] for x in
source_table.primary_key.columns])
+ )
+ )
+ else:
+ delete = source_table.delete().where(
+ and_(col == target_table.c[col.name] for col in
source_table.primary_key.columns)
+ )
+ logger.debug("delete statement:\n%s", delete.compile())
+ session.execute(delete)
+ session.commit()
+ except BaseException as e:
+ raise e
+ finally:
+ if target_table is not None and skip_archive:
+ bind = session.get_bind()
+ target_table.drop(bind=bind)
+ session.commit()
+ print("Finished Performing Delete")
def _subquery_keep_last(
diff --git a/docs/apache-airflow/howto/usage-cli.rst
b/docs/apache-airflow/howto/usage-cli.rst
index ff239cf18ad..9d7160a2212 100644
--- a/docs/apache-airflow/howto/usage-cli.rst
+++ b/docs/apache-airflow/howto/usage-cli.rst
@@ -217,6 +217,8 @@ You can use the ``--dry-run`` option to print the row
counts in the primary tabl
By default, ``db clean`` will archive purged rows in tables of the form
``_airflow_deleted__<table>__<timestamp>``. If you don't want the data
preserved in this way, you may supply argument ``--skip-archive``.
+When you encounter an error without using ``--skip-archive``,
``_airflow_deleted__<table>__<timestamp>`` would still exist in the DB. You can
use ``db drop-archived`` command to manually drop these tables.
+
Export the purged records from the archive tables
-------------------------------------------------
The ``db export-archived`` command exports the contents of the archived
tables, created by the ``db clean`` command,
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 83e3b07d80e..5bce3e95ebc 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -27,7 +27,7 @@ from uuid import uuid4
import pendulum
import pytest
from sqlalchemy import text
-from sqlalchemy.exc import OperationalError
+from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.ext.declarative import DeclarativeMeta
from airflow.exceptions import AirflowException
@@ -36,6 +36,7 @@ from airflow.providers.standard.operators.python import
PythonOperator
from airflow.utils import timezone
from airflow.utils.db_cleanup import (
ARCHIVE_TABLE_PREFIX,
+ ARCHIVED_TABLES_FROM_DB_MIGRATIONS,
CreateTableAs,
_build_query,
_cleanup_table,
@@ -318,6 +319,37 @@ class TestDBCleanup:
assert len(session.query(model).all()) == 5
assert len(_get_archived_table_names(["dag_run"], session)) ==
expected_archives
+ @patch("airflow.utils.db.reflect_tables")
+ def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock):
+ """
+ Verify that running cleanup_table with skip_archive = True, and
failure happens.
+
+ The archive table should be removed from db if any exception.
+ """
+ reflect_tables_mock.side_effect = SQLAlchemyError("Deletion failed")
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+ num_tis = 10
+ create_tis(
+ base_date=base_date,
+ num_tis=num_tis,
+ )
+ try:
+ with create_session() as session:
+ clean_before_date = base_date.add(days=5)
+ _cleanup_table(
+ **config_dict["dag_run"].__dict__,
+ clean_before_timestamp=clean_before_date,
+ dry_run=False,
+ session=session,
+ table_names=["dag_run"],
+ skip_archive=True,
+ )
+ except SQLAlchemyError:
+ pass
+ archived_table_names = _get_archived_table_names(["dag_run"], session)
+ assert len(archived_table_names) == 1
+ assert archived_table_names[0] in ARCHIVED_TABLES_FROM_DB_MIGRATIONS
+
def test_no_models_missing(self):
"""
1. Verify that for all tables in `airflow.models`, we either have them
enabled in db cleanup,