xchwan commented on code in PR #55500:
URL: https://github.com/apache/airflow/pull/55500#discussion_r2471719477


##########
airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py:
##########
@@ -44,35 +44,24 @@ def upgrade():
     conn = op.get_bind()
     if conn.dialect.name == "mysql":
         # TODO: Rewrite these queries to use alembic when lowest MYSQL version 
supports IF EXISTS
-        conn.execute(
+        # Remove prepared statements for PyMySQL
+        result = conn.execute(
             sa.text("""
-        set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS 
WHERE
-            CONSTRAINT_SCHEMA = DATABASE() AND
-            TABLE_NAME        = 'connection' AND
-            CONSTRAINT_NAME   = 'unique_conn_id' AND
-            CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE connection
-            DROP INDEX unique_conn_id','select 1');
-
-        prepare stmt from @var;
-        execute stmt;
-        deallocate prepare stmt;
-        """)
+                SELECT CONSTRAINT_NAME
+                FROM information_schema.TABLE_CONSTRAINTS
+                WHERE CONSTRAINT_SCHEMA = DATABASE()
+                AND TABLE_NAME = 'connection'
+                AND CONSTRAINT_TYPE = 'UNIQUE';
+            """)
         )
-        # Dropping the below and recreating cause there's no IF NOT EXISTS in 
mysql
-        conn.execute(
-            sa.text("""
-                set @var=if((SELECT true FROM 
information_schema.TABLE_CONSTRAINTS WHERE
-                    CONSTRAINT_SCHEMA = DATABASE() AND
-                    TABLE_NAME        = 'connection' AND
-                    CONSTRAINT_NAME   = 'connection_conn_id_uq' AND
-                    CONSTRAINT_TYPE   = 'UNIQUE') = true,'ALTER TABLE 
connection
-                    DROP INDEX connection_conn_id_uq','select 1');
 
-                prepare stmt from @var;
-                execute stmt;
-                deallocate prepare stmt;
-                """)
-        )
+        rows = result.all() if result is not None else []
+        existing_indexes = {row[0] for row in rows}
+        drop_indexes = ["unique_conn_id", "connection_conn_id_uq"]
+        for idx in drop_indexes:
+            if idx in existing_indexes:
+                conn.execute(sa.text(f"ALTER TABLE connection DROP INDEX 
{idx}"))

Review Comment:
   Excuse me, how should I do to run it with connection values? I don't find 
any information in docs. I also wonder that CI contains Tests AMD / MySQL 
tests: core / DB-core:MySQL:8.0:3.10:API...Serialization. Is it not cover the 
test you expect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to