This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ccff9244a Add cascade to `dag_tag` to `dag` foreignkey (#23444)
8ccff9244a is described below

commit 8ccff9244a6d1a936d8732721373b967e95ec404
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri May 27 15:28:49 2022 +0100

    Add cascade to `dag_tag` to `dag` foreignkey (#23444)
    
    Bulk delete does not work if the cascade behaviour of a foreignkey
    is set on python side(relationship configuration). To allow bulk delete of 
dags
    we need to setup cascade deletion in the DB.
    
    The warning on query.delete at
    
https://docs.sqlalchemy.org/en/14/orm/session_basics.html#selecting-a-synchronization-strategy
    stated that:
    
    The operations do not offer in-Python cascading of relationships - it is 
assumed that ON UPDATE CASCADE and/or ON DELETE CASCADE is configured for any 
foreign key references which require it, otherwise the database may emit an 
integrity violation if foreign key references are being enforced.
    
    Another alternative is avoiding bulk delete of dags but I prefer we support 
bulk deletes.
    
    This will break offline sql generation for mssql(already broken before now 
:) ). Also, since there's only one foreign key
    in `dag_tag` table, I assume that the foreign key would be named 
`dag_tag_ibfk_1` in `mysql`. This
    avoided having to query the db for the name.
    
    The foreignkey is explicitly named now, would be easy for future upgrades
---
 airflow/migrations/utils.py                        |  3 +-
 ...0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py | 84 ++++++++++++++++++++++
 airflow/models/dag.py                              |  8 ++-
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 4 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py
index f1f3ea0442..5737fa9507 100644
--- a/airflow/migrations/utils.py
+++ b/airflow/migrations/utils.py
@@ -35,7 +35,8 @@ def get_mssql_table_constraints(conn, table_name):
      FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc
      JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON 
ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
      WHERE tc.TABLE_NAME = '{table_name}' AND
-     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 
'UNIQUE')
+     (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 
'UNIQUE'
+     or UPPER(tc.CONSTRAINT_TYPE) = 'FOREIGN KEY')
     """
     result = conn.execute(query).fetchall()
     constraint_dict = defaultdict(lambda: defaultdict(list))
diff --git 
a/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py 
b/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py
new file mode 100644
index 0000000000..55d9e9754e
--- /dev/null
+++ 
b/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add cascade to dag_tag foreign key
+
+Revision ID: 3c94c427fdf6
+Revises: 1de7bc13c950
+Create Date: 2022-05-03 09:47:41.957710
+
+"""
+
+from alembic import op
+
+from airflow.migrations.utils import get_mssql_table_constraints
+
+# revision identifiers, used by Alembic.
+revision = '3c94c427fdf6'
+down_revision = '1de7bc13c950'
+branch_labels = None
+depends_on = None
+airflow_version = '2.3.2'
+
+
+def upgrade():
+    """Apply Add cascade to dag_tag foreignkey"""
+    conn = op.get_bind()
+    if conn.dialect.name == 'sqlite':
+        naming_convention = {
+            "fk": 
"fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
+        }
+        with op.batch_alter_table(
+            'dag_tag', naming_convention=naming_convention, recreate='always'
+        ) as batch_op:
+            batch_op.drop_constraint('fk_dag_tag_dag_id_dag', 
type_='foreignkey')
+            batch_op.create_foreign_key(
+                "dag_tag_dag_id_fkey", 'dag', ['dag_id'], ['dag_id'], 
ondelete='CASCADE'
+            )
+    else:
+        with op.batch_alter_table('dag_tag') as batch_op:
+            if conn.dialect.name == 'mssql':
+                constraints = get_mssql_table_constraints(conn, 'dag_tag')
+                Fk, _ = constraints['FOREIGN KEY'].popitem()
+                batch_op.drop_constraint(Fk, type_='foreignkey')
+            if conn.dialect.name == 'postgresql':
+                batch_op.drop_constraint('dag_tag_dag_id_fkey', 
type_='foreignkey')
+            if conn.dialect.name == 'mysql':
+                batch_op.drop_constraint('dag_tag_ibfk_1', type_='foreignkey')
+
+            batch_op.create_foreign_key(
+                "dag_tag_dag_id_fkey", 'dag', ['dag_id'], ['dag_id'], 
ondelete='CASCADE'
+            )
+
+
+def downgrade():
+    """Unapply Add cascade to dag_tag foreignkey"""
+    conn = op.get_bind()
+    if conn.dialect.name == 'sqlite':
+        with op.batch_alter_table('dag_tag') as batch_op:
+            batch_op.drop_constraint('dag_tag_dag_id_fkey', type_='foreignkey')
+            batch_op.create_foreign_key("fk_dag_tag_dag_id_dag", 'dag', 
['dag_id'], ['dag_id'])
+    else:
+        with op.batch_alter_table('dag_tag') as batch_op:
+            batch_op.drop_constraint('dag_tag_dag_id_fkey', type_='foreignkey')
+            batch_op.create_foreign_key(
+                None,
+                'dag',
+                ['dag_id'],
+                ['dag_id'],
+            )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f48883ec6b..f3684bb414 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2638,7 +2638,11 @@ class DagTag(Base):
 
     __tablename__ = "dag_tag"
     name = Column(String(100), primary_key=True)
-    dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True)
+    dag_id = Column(
+        String(ID_LEN),
+        ForeignKey('dag.dag_id', name='dag_tag_dag_id_fkey', 
ondelete='CASCADE'),
+        primary_key=True,
+    )
 
     def __repr__(self):
         return self.name
@@ -2689,7 +2693,7 @@ class DagModel(Base):
     timetable_description = Column(String(1000), nullable=True)
 
     # Tags for view filter
-    tags = relationship('DagTag', cascade='all,delete-orphan', 
backref=backref('dag'))
+    tags = relationship('DagTag', cascade='all, delete, delete-orphan', 
backref=backref('dag'))
 
     max_active_tasks = Column(Integer, nullable=False)
     max_active_runs = Column(Integer, nullable=True)
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 1f81e8cbb2..ae765b99dd 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``1de7bc13c950`` (head)         | ``b1b348e02d07``  | ``2.3.1``         | 
Add index for ``event`` column in ``log`` table.             |
+| ``3c94c427fdf6`` (head)         | ``1de7bc13c950``  | ``2.3.2``         | 
Add cascade to dag_tag foreign key                           |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``1de7bc13c950``                | ``b1b348e02d07``  | ``2.3.1``         | 
Add index for ``event`` column in ``log`` table.             |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``b1b348e02d07``                | ``75d5ed6c2b43``  | ``2.3.0``         | 
Update dag.default_view to grid                              |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+

Reply via email to