bugraoz93 commented on code in PR #46857:
URL: https://github.com/apache/airflow/pull/46857#discussion_r1965876726
##########
airflow/utils/db_cleanup.py:
##########
@@ -170,43 +170,50 @@ def _do_delete(*, query: Query, orm_model: Base,
skip_archive: bool, session: Se
print(f"Moving data to table {target_table_name}")
bind = session.get_bind()
dialect_name = bind.dialect.name
- if dialect_name == "mysql":
- # MySQL with replication needs this split into two queries, so just do
it for all MySQL
- # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE
TABLE ... SELECT.
- session.execute(text(f"CREATE TABLE {target_table_name} LIKE
{orm_model.name}"))
- metadata = reflect_tables([target_table_name], session)
+ target_table = None
+ try:
+ if dialect_name == "mysql":
+ # MySQL with replication needs this split into two queries, so
just do it for all MySQL
+ # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE
TABLE ... SELECT.
+ session.execute(text(f"CREATE TABLE {target_table_name} LIKE
{orm_model.name}"))
+ metadata = reflect_tables([target_table_name], session)
+ target_table = metadata.tables[target_table_name]
+ insert_stm = target_table.insert().from_select(target_table.c,
query)
+ logger.debug("insert statement:\n%s", insert_stm.compile())
+ session.execute(insert_stm)
+ else:
+ stmt = CreateTableAs(target_table_name, query.selectable)
+ logger.debug("ctas query:\n%s", stmt.compile())
+ session.execute(stmt)
+ session.commit()
+
+ # delete the rows from the old table
+ metadata = reflect_tables([orm_model.name, target_table_name], session)
+ source_table = metadata.tables[orm_model.name]
target_table = metadata.tables[target_table_name]
- insert_stm = target_table.insert().from_select(target_table.c, query)
- logger.debug("insert statement:\n%s", insert_stm.compile())
- session.execute(insert_stm)
- else:
- stmt = CreateTableAs(target_table_name, query.selectable)
- logger.debug("ctas query:\n%s", stmt.compile())
- session.execute(stmt)
- session.commit()
-
- # delete the rows from the old table
- metadata = reflect_tables([orm_model.name, target_table_name], session)
- source_table = metadata.tables[orm_model.name]
- target_table = metadata.tables[target_table_name]
- logger.debug("rows moved; purging from %s", source_table.name)
- if dialect_name == "sqlite":
- pk_cols = source_table.primary_key.columns
- delete = source_table.delete().where(
- tuple_(*pk_cols).in_(select(*[target_table.c[x.name] for x in
source_table.primary_key.columns]))
- )
- else:
- delete = source_table.delete().where(
- and_(col == target_table.c[col.name] for col in
source_table.primary_key.columns)
- )
- logger.debug("delete statement:\n%s", delete.compile())
- session.execute(delete)
- session.commit()
- if skip_archive:
- bind = session.get_bind()
- target_table.drop(bind=bind)
- session.commit()
- print("Finished Performing Delete")
+ logger.debug("rows moved; purging from %s", source_table.name)
+ if dialect_name == "sqlite":
+ pk_cols = source_table.primary_key.columns
+ delete = source_table.delete().where(
+ tuple_(*pk_cols).in_(
+ select(*[target_table.c[x.name] for x in
source_table.primary_key.columns])
+ )
+ )
+ else:
+ delete = source_table.delete().where(
+ and_(col == target_table.c[col.name] for col in
source_table.primary_key.columns)
+ )
+ logger.debug("delete statement:\n%s", delete.compile())
+ session.execute(delete)
+ session.commit()
+ except BaseException as e:
+ raise e
+ finally:
+ if target_table is not None and skip_archive:
Review Comment:
If we go with this behaviour, we should update the documentation and warn
users. This is not exactly a breaking change, but those tables can only be
manually deleted after a migration so everyone who runs the clean could be
surprised. :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]