Lee-W commented on code in PR #64838:
URL: https://github.com/apache/airflow/pull/64838#discussion_r3062870728


##########
airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py:
##########
@@ -105,13 +104,12 @@ def _drop_fkey_if_exists(table, constraint_name):
 
     if dialect_name == "sqlite":
         # SQLite requires foreign key constraints to be disabled during batch 
operations
-        conn.execute(text("PRAGMA foreign_keys=OFF"))
-        try:
-            with op.batch_alter_table(table, schema=None) as batch_op:
-                batch_op.drop_constraint(op.f(constraint_name), 
type_="foreignkey")
-        except ValueError:
-            pass
-        conn.execute(text("PRAGMA foreign_keys=ON"))
+        with disable_sqlite_fkeys(op):

Review Comment:
   same here



##########
airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py:
##########
@@ -76,9 +78,9 @@ def upgrade():
     elif conn.dialect.name == "sqlite":
         # SQLite does not support DROP CONSTRAINT
         # We have to recreate the table without the constraint
-        conn.execute(sa.text("PRAGMA foreign_keys=off"))

Review Comment:
   Why do we want to do this? We already know the `dialect` is SQLite here.



##########
airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py:
##########
@@ -61,61 +61,48 @@ def upgrade():
                     ('dags-folder');
                     """)
         )
-    if dialect_name == "sqlite":
-        op.execute(text("PRAGMA foreign_keys=OFF"))
-        op.execute(
-            text("""
-                    INSERT OR IGNORE INTO dag_bundle (name) VALUES
-                    ('example_dags'),
-                    ('dags-folder');
-                    """)
-        )
 
-    conn = op.get_bind()
-    with ignore_sqlite_value_error(), op.batch_alter_table("dag", schema=None) 
as batch_op:
-        conn.execute(
-            text(
-                """
-                UPDATE dag
-                SET bundle_name =
-                    CASE
-                        WHEN fileloc LIKE '%/airflow/example_dags/%' THEN 
'example_dags'
-                        ELSE 'dags-folder'
-                    END
-                WHERE bundle_name IS NULL
-                """
+    with disable_sqlite_fkeys(op):
+        if dialect_name == "sqlite":

Review Comment:
   I don't think we need it?



##########
airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py:
##########
@@ -61,61 +61,48 @@ def upgrade():
                     ('dags-folder');
                     """)
         )
-    if dialect_name == "sqlite":
-        op.execute(text("PRAGMA foreign_keys=OFF"))
-        op.execute(
-            text("""
-                    INSERT OR IGNORE INTO dag_bundle (name) VALUES
-                    ('example_dags'),
-                    ('dags-folder');
-                    """)
-        )
 
-    conn = op.get_bind()
-    with ignore_sqlite_value_error(), op.batch_alter_table("dag", schema=None) 
as batch_op:
-        conn.execute(
-            text(
-                """
-                UPDATE dag
-                SET bundle_name =
-                    CASE
-                        WHEN fileloc LIKE '%/airflow/example_dags/%' THEN 
'example_dags'
-                        ELSE 'dags-folder'
-                    END
-                WHERE bundle_name IS NULL
-                """
+    with disable_sqlite_fkeys(op):
+        if dialect_name == "sqlite":
+            op.execute(
+                text("""
+                        INSERT OR IGNORE INTO dag_bundle (name) VALUES
+                        ('example_dags'),
+                        ('dags-folder');
+                        """)
             )
-        )
-        # drop the foreign key temporarily and recreate it once both columns 
are changed
-        batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), 
type_="foreignkey")
-        batch_op.alter_column("bundle_name", nullable=False, 
existing_type=StringID())
 
-    with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
-        batch_op.alter_column("name", nullable=False, existing_type=StringID())
+        conn = op.get_bind()
+        with ignore_sqlite_value_error(), op.batch_alter_table("dag", 
schema=None) as batch_op:
+            conn.execute(
+                text(
+                    """
+                    UPDATE dag
+                    SET bundle_name =
+                        CASE
+                            WHEN fileloc LIKE '%/airflow/example_dags/%' THEN 
'example_dags'
+                            ELSE 'dags-folder'
+                        END
+                    WHERE bundle_name IS NULL
+                    """
+                )
+            )
+            # drop the foreign key temporarily and recreate it once both 
columns are changed
+            batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), 
type_="foreignkey")
+            batch_op.alter_column("bundle_name", nullable=False, 
existing_type=StringID())
 
-    with op.batch_alter_table("dag", schema=None) as batch_op:
-        batch_op.create_foreign_key(
-            batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], 
["name"]
-        )
+        with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
+            batch_op.alter_column("name", nullable=False, 
existing_type=StringID())
 
-    if dialect_name == "sqlite":
-        op.execute(text("PRAGMA foreign_keys=ON"))
+        with op.batch_alter_table("dag", schema=None) as batch_op:
+            batch_op.create_foreign_key(
+                batch_op.f("dag_bundle_name_fkey"), "dag_bundle", 
["bundle_name"], ["name"]
+            )
 
 
 def downgrade():
     """Make bundle_name nullable."""
-    import contextlib
-
-    dialect_name = op.get_bind().dialect.name
-    exitstack = contextlib.ExitStack()
-
-    if dialect_name == "sqlite":
-        # SQLite requires foreign key constraints to be disabled during batch 
operations
-        conn = op.get_bind()
-        conn.execute(text("PRAGMA foreign_keys=OFF"))
-        exitstack.callback(conn.execute, text("PRAGMA foreign_keys=ON"))
-
-    with exitstack:
+    with disable_sqlite_fkeys(op):

Review Comment:
   this is good 👍



##########
airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py:
##########
@@ -213,60 +214,59 @@ def upgrade():
     elif conn.dialect.name == "sqlite":
         # SQLite does not support DROP CONSTRAINT
         # We have to recreate the table without the constraint
-        conn.execute(sa.text("PRAGMA foreign_keys=off"))
-        conn.execute(
-            sa.text("""
-            CREATE TABLE dag_run_new (
-                id INTEGER NOT NULL,
-                dag_id VARCHAR(250) NOT NULL,
-                queued_at TIMESTAMP,
-                execution_date TIMESTAMP NOT NULL,
-                start_date TIMESTAMP,
-                end_date TIMESTAMP,
-                state VARCHAR(50),
-                run_id VARCHAR(250) NOT NULL,
-                creating_job_id INTEGER,
-                external_trigger BOOLEAN,
-                run_type VARCHAR(50) NOT NULL,
-                conf BLOB,
-                data_interval_start TIMESTAMP,
-                data_interval_end TIMESTAMP,
-                last_scheduling_decision TIMESTAMP,
-                dag_hash VARCHAR(32),
-                log_template_id INTEGER,
-                updated_at TIMESTAMP,
-                clear_number INTEGER DEFAULT '0' NOT NULL,
-                CONSTRAINT dag_run_pkey PRIMARY KEY (id),
-                CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, 
execution_date),
-                CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
-                CONSTRAINT task_instance_log_template_id_fkey FOREIGN 
KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
-            )
-        """)
-        )
-        headers = (
-            "id, dag_id, queued_at, execution_date, start_date, end_date, 
state, run_id, creating_job_id, "
-            "external_trigger, run_type, conf, data_interval_start, 
data_interval_end, "
-            "last_scheduling_decision, dag_hash, log_template_id, updated_at, 
clear_number"
-        )
-        conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT 
{headers} FROM dag_run"))
-        conn.execute(sa.text("DROP TABLE dag_run"))
-        conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
-        conn.execute(sa.text("PRAGMA foreign_keys=on"))
-        with op.batch_alter_table("dag_run") as batch_op:
-            batch_op.create_index("dag_id_state", ["dag_id", "state"], 
if_not_exists=True)
-            batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], 
if_not_exists=True)
-            batch_op.create_index(
-                "idx_dag_run_running_dags",
-                ["state", "dag_id"],
-                sqlite_where=sa.text("state='running'"),
-                if_not_exists=True,
+        with disable_sqlite_fkeys(op):

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to