This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e87a2a72cc fix: finally delete archive table when airflow clean fail 
(#46857)
1e87a2a72cc is described below

commit 1e87a2a72ccd172ba851e62f4fe4c77998b19d73
Author: Huanjie Guo <[email protected]>
AuthorDate: Fri Feb 28 18:00:22 2025 +0800

    fix: finally delete archive table when airflow clean fail (#46857)
    
    * finally delete target table when airflow clean fail
    
    * finally delete target table when airflow clean fail
    
    * fix: format change
    
    * fix: format change
    
    * doc: add doc for failure in airflow db clean
    
    * ut: add case for failure in airflow db clean
    
    ---------
    
    Co-authored-by: Huanjie Guo <[email protected]>
---
 airflow/utils/db_cleanup.py             | 79 ++++++++++++++++++---------------
 docs/apache-airflow/howto/usage-cli.rst |  2 +
 tests/utils/test_db_cleanup.py          | 34 +++++++++++++-
 3 files changed, 78 insertions(+), 37 deletions(-)

diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 00af8ff6c8a..0efb9381a37 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -171,43 +171,50 @@ def _do_delete(*, query: Query, orm_model: Base, 
skip_archive: bool, session: Se
     print(f"Moving data to table {target_table_name}")
     bind = session.get_bind()
     dialect_name = bind.dialect.name
-    if dialect_name == "mysql":
-        # MySQL with replication needs this split into two queries, so just do 
it for all MySQL
-        # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE 
TABLE ... SELECT.
-        session.execute(text(f"CREATE TABLE {target_table_name} LIKE 
{orm_model.name}"))
-        metadata = reflect_tables([target_table_name], session)
+    target_table = None
+    try:
+        if dialect_name == "mysql":
+            # MySQL with replication needs this split into two queries, so 
just do it for all MySQL
+            # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE 
TABLE ... SELECT.
+            session.execute(text(f"CREATE TABLE {target_table_name} LIKE 
{orm_model.name}"))
+            metadata = reflect_tables([target_table_name], session)
+            target_table = metadata.tables[target_table_name]
+            insert_stm = target_table.insert().from_select(target_table.c, 
query)
+            logger.debug("insert statement:\n%s", insert_stm.compile())
+            session.execute(insert_stm)
+        else:
+            stmt = CreateTableAs(target_table_name, query.selectable)
+            logger.debug("ctas query:\n%s", stmt.compile())
+            session.execute(stmt)
+        session.commit()
+
+        # delete the rows from the old table
+        metadata = reflect_tables([orm_model.name, target_table_name], session)
+        source_table = metadata.tables[orm_model.name]
         target_table = metadata.tables[target_table_name]
-        insert_stm = target_table.insert().from_select(target_table.c, query)
-        logger.debug("insert statement:\n%s", insert_stm.compile())
-        session.execute(insert_stm)
-    else:
-        stmt = CreateTableAs(target_table_name, query.selectable)
-        logger.debug("ctas query:\n%s", stmt.compile())
-        session.execute(stmt)
-    session.commit()
-
-    # delete the rows from the old table
-    metadata = reflect_tables([orm_model.name, target_table_name], session)
-    source_table = metadata.tables[orm_model.name]
-    target_table = metadata.tables[target_table_name]
-    logger.debug("rows moved; purging from %s", source_table.name)
-    if dialect_name == "sqlite":
-        pk_cols = source_table.primary_key.columns
-        delete = source_table.delete().where(
-            tuple_(*pk_cols).in_(select(*[target_table.c[x.name] for x in 
source_table.primary_key.columns]))
-        )
-    else:
-        delete = source_table.delete().where(
-            and_(col == target_table.c[col.name] for col in 
source_table.primary_key.columns)
-        )
-    logger.debug("delete statement:\n%s", delete.compile())
-    session.execute(delete)
-    session.commit()
-    if skip_archive:
-        bind = session.get_bind()
-        target_table.drop(bind=bind)
-    session.commit()
-    print("Finished Performing Delete")
+        logger.debug("rows moved; purging from %s", source_table.name)
+        if dialect_name == "sqlite":
+            pk_cols = source_table.primary_key.columns
+            delete = source_table.delete().where(
+                tuple_(*pk_cols).in_(
+                    select(*[target_table.c[x.name] for x in 
source_table.primary_key.columns])
+                )
+            )
+        else:
+            delete = source_table.delete().where(
+                and_(col == target_table.c[col.name] for col in 
source_table.primary_key.columns)
+            )
+        logger.debug("delete statement:\n%s", delete.compile())
+        session.execute(delete)
+        session.commit()
+    except BaseException as e:
+        raise e
+    finally:
+        if target_table is not None and skip_archive:
+            bind = session.get_bind()
+            target_table.drop(bind=bind)
+            session.commit()
+            print("Finished Performing Delete")
 
 
 def _subquery_keep_last(
diff --git a/docs/apache-airflow/howto/usage-cli.rst 
b/docs/apache-airflow/howto/usage-cli.rst
index ff239cf18ad..9d7160a2212 100644
--- a/docs/apache-airflow/howto/usage-cli.rst
+++ b/docs/apache-airflow/howto/usage-cli.rst
@@ -217,6 +217,8 @@ You can use the ``--dry-run`` option to print the row 
counts in the primary tabl
 
 By default, ``db clean`` will archive purged rows in tables of the form 
``_airflow_deleted__<table>__<timestamp>``.  If you don't want the data 
preserved in this way, you may supply argument ``--skip-archive``.
 
+When you encounter an error without using ``--skip-archive``,  
``_airflow_deleted__<table>__<timestamp>`` would still exist in the DB. You can 
use  ``db drop-archived`` command to manually drop these tables.
+
 Export the purged records from the archive tables
 -------------------------------------------------
 The ``db export-archived`` command exports the contents of the archived 
tables, created by the ``db clean`` command,
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 83e3b07d80e..5bce3e95ebc 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -27,7 +27,7 @@ from uuid import uuid4
 import pendulum
 import pytest
 from sqlalchemy import text
-from sqlalchemy.exc import OperationalError
+from sqlalchemy.exc import OperationalError, SQLAlchemyError
 from sqlalchemy.ext.declarative import DeclarativeMeta
 
 from airflow.exceptions import AirflowException
@@ -36,6 +36,7 @@ from airflow.providers.standard.operators.python import 
PythonOperator
 from airflow.utils import timezone
 from airflow.utils.db_cleanup import (
     ARCHIVE_TABLE_PREFIX,
+    ARCHIVED_TABLES_FROM_DB_MIGRATIONS,
     CreateTableAs,
     _build_query,
     _cleanup_table,
@@ -318,6 +319,37 @@ class TestDBCleanup:
             assert len(session.query(model).all()) == 5
             assert len(_get_archived_table_names(["dag_run"], session)) == 
expected_archives
 
+    @patch("airflow.utils.db.reflect_tables")
+    def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock):
+        """
+        Verify that running cleanup_table with skip_archive = True, and 
failure happens.
+
+        The archive table should be removed from db if any exception.
+        """
+        reflect_tables_mock.side_effect = SQLAlchemyError("Deletion failed")
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+        num_tis = 10
+        create_tis(
+            base_date=base_date,
+            num_tis=num_tis,
+        )
+        try:
+            with create_session() as session:
+                clean_before_date = base_date.add(days=5)
+                _cleanup_table(
+                    **config_dict["dag_run"].__dict__,
+                    clean_before_timestamp=clean_before_date,
+                    dry_run=False,
+                    session=session,
+                    table_names=["dag_run"],
+                    skip_archive=True,
+                )
+        except SQLAlchemyError:
+            pass
+        archived_table_names = _get_archived_table_names(["dag_run"], session)
+        assert len(archived_table_names) == 1
+        assert archived_table_names[0] in ARCHIVED_TABLES_FROM_DB_MIGRATIONS
+
     def test_no_models_missing(self):
         """
         1. Verify that for all tables in `airflow.models`, we either have them 
enabled in db cleanup,

Reply via email to