This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 78cc2e89e5 Fix db clean command for mysql db (#29999)
78cc2e89e5 is described below
commit 78cc2e89e5d46738664b7442dc6f5a00b23d1ef5
Author: Vu Tan <[email protected]>
AuthorDate: Mon Mar 13 22:58:02 2023 +0900
Fix db clean command for mysql db (#29999)
---
airflow/utils/db_cleanup.py | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 7ce73dd941..03b233f740 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -150,9 +150,21 @@ def _do_delete(*, query, orm_model, skip_archive, session):
timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
target_table_name =
f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
print(f"Moving data to table {target_table_name}")
- stmt = CreateTableAs(target_table_name, query.selectable)
- logger.debug("ctas query:\n%s", stmt.compile())
- session.execute(stmt)
+ bind = session.get_bind()
+ dialect_name = bind.dialect.name
+ if dialect_name == "mysql":
+ # MySQL with replication needs this split into two queries, so just do
it for all MySQL
+ # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE
TABLE ... SELECT.
+ session.execute(f"CREATE TABLE {target_table_name} LIKE
{orm_model.name}")
+ metadata = reflect_tables([target_table_name], session)
+ target_table = metadata.tables[target_table_name]
+ insert_stm = target_table.insert().from_select(target_table.c, query)
+ logger.debug("insert statement:\n%s", insert_stm.compile())
+ session.execute(insert_stm)
+ else:
+ stmt = CreateTableAs(target_table_name, query.selectable)
+ logger.debug("ctas query:\n%s", stmt.compile())
+ session.execute(stmt)
session.commit()
# delete the rows from the old table
@@ -160,8 +172,6 @@ def _do_delete(*, query, orm_model, skip_archive, session):
source_table = metadata.tables[orm_model.name]
target_table = metadata.tables[target_table_name]
logger.debug("rows moved; purging from %s", source_table.name)
- bind = session.get_bind()
- dialect_name = bind.dialect.name
if dialect_name == "sqlite":
pk_cols = source_table.primary_key.columns
delete = source_table.delete().where(