This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 78cc2e89e5 Fix db clean command for mysql db (#29999)
78cc2e89e5 is described below

commit 78cc2e89e5d46738664b7442dc6f5a00b23d1ef5
Author: Vu Tan <[email protected]>
AuthorDate: Mon Mar 13 22:58:02 2023 +0900

    Fix db clean command for mysql db (#29999)
---
 airflow/utils/db_cleanup.py | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index 7ce73dd941..03b233f740 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -150,9 +150,21 @@ def _do_delete(*, query, orm_model, skip_archive, session):
     timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
     target_table_name = 
f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
     print(f"Moving data to table {target_table_name}")
-    stmt = CreateTableAs(target_table_name, query.selectable)
-    logger.debug("ctas query:\n%s", stmt.compile())
-    session.execute(stmt)
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+    if dialect_name == "mysql":
+        # MySQL with replication needs this split into two queries, so just do 
it for all MySQL
+        # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE 
TABLE ... SELECT.
+        session.execute(f"CREATE TABLE {target_table_name} LIKE 
{orm_model.name}")
+        metadata = reflect_tables([target_table_name], session)
+        target_table = metadata.tables[target_table_name]
+        insert_stm = target_table.insert().from_select(target_table.c, query)
+        logger.debug("insert statement:\n%s", insert_stm.compile())
+        session.execute(insert_stm)
+    else:
+        stmt = CreateTableAs(target_table_name, query.selectable)
+        logger.debug("ctas query:\n%s", stmt.compile())
+        session.execute(stmt)
     session.commit()
 
     # delete the rows from the old table
@@ -160,8 +172,6 @@ def _do_delete(*, query, orm_model, skip_archive, session):
     source_table = metadata.tables[orm_model.name]
     target_table = metadata.tables[target_table_name]
     logger.debug("rows moved; purging from %s", source_table.name)
-    bind = session.get_bind()
-    dialect_name = bind.dialect.name
     if dialect_name == "sqlite":
         pk_cols = source_table.primary_key.columns
         delete = source_table.delete().where(

Reply via email to