HuanjieGuo commented on code in PR #46857:
URL: https://github.com/apache/airflow/pull/46857#discussion_r1973998650


##########
airflow/utils/db_cleanup.py:
##########
@@ -170,43 +170,50 @@ def _do_delete(*, query: Query, orm_model: Base, 
skip_archive: bool, session: Se
     print(f"Moving data to table {target_table_name}")
     bind = session.get_bind()
     dialect_name = bind.dialect.name
-    if dialect_name == "mysql":
-        # MySQL with replication needs this split into two queries, so just do 
it for all MySQL
-        # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE 
TABLE ... SELECT.
-        session.execute(text(f"CREATE TABLE {target_table_name} LIKE 
{orm_model.name}"))
-        metadata = reflect_tables([target_table_name], session)
+    target_table = None
+    try:
+        if dialect_name == "mysql":
+            # MySQL with replication needs this split into two queries, so 
just do it for all MySQL
+            # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE 
TABLE ... SELECT.
+            session.execute(text(f"CREATE TABLE {target_table_name} LIKE 
{orm_model.name}"))
+            metadata = reflect_tables([target_table_name], session)
+            target_table = metadata.tables[target_table_name]
+            insert_stm = target_table.insert().from_select(target_table.c, 
query)
+            logger.debug("insert statement:\n%s", insert_stm.compile())
+            session.execute(insert_stm)
+        else:
+            stmt = CreateTableAs(target_table_name, query.selectable)
+            logger.debug("ctas query:\n%s", stmt.compile())
+            session.execute(stmt)
+        session.commit()
+
+        # delete the rows from the old table
+        metadata = reflect_tables([orm_model.name, target_table_name], session)
+        source_table = metadata.tables[orm_model.name]
         target_table = metadata.tables[target_table_name]
-        insert_stm = target_table.insert().from_select(target_table.c, query)
-        logger.debug("insert statement:\n%s", insert_stm.compile())
-        session.execute(insert_stm)
-    else:
-        stmt = CreateTableAs(target_table_name, query.selectable)
-        logger.debug("ctas query:\n%s", stmt.compile())
-        session.execute(stmt)
-    session.commit()
-
-    # delete the rows from the old table
-    metadata = reflect_tables([orm_model.name, target_table_name], session)
-    source_table = metadata.tables[orm_model.name]
-    target_table = metadata.tables[target_table_name]
-    logger.debug("rows moved; purging from %s", source_table.name)
-    if dialect_name == "sqlite":
-        pk_cols = source_table.primary_key.columns
-        delete = source_table.delete().where(
-            tuple_(*pk_cols).in_(select(*[target_table.c[x.name] for x in 
source_table.primary_key.columns]))
-        )
-    else:
-        delete = source_table.delete().where(
-            and_(col == target_table.c[col.name] for col in 
source_table.primary_key.columns)
-        )
-    logger.debug("delete statement:\n%s", delete.compile())
-    session.execute(delete)
-    session.commit()
-    if skip_archive:
-        bind = session.get_bind()
-        target_table.drop(bind=bind)
-    session.commit()
-    print("Finished Performing Delete")
+        logger.debug("rows moved; purging from %s", source_table.name)
+        if dialect_name == "sqlite":
+            pk_cols = source_table.primary_key.columns
+            delete = source_table.delete().where(
+                tuple_(*pk_cols).in_(
+                    select(*[target_table.c[x.name] for x in 
source_table.primary_key.columns])
+                )
+            )
+        else:
+            delete = source_table.delete().where(
+                and_(col == target_table.c[col.name] for col in 
source_table.primary_key.columns)
+            )
+        logger.debug("delete statement:\n%s", delete.compile())
+        session.execute(delete)
+        session.commit()
+    except BaseException as e:

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to