This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f6bb4746ef Cascade update of taskinstance to TaskMap table (#31445)
f6bb4746ef is described below

commit f6bb4746efbc6a94fa17b6c77b31d9fb17305ffc
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed May 24 11:54:45 2023 +0100

    Cascade update of taskinstance to TaskMap table (#31445)
---
 .../0125_2_6_2_add_onupdate_cascade_to_taskmap.py  | 62 ++++++++++++++++++++++
 ...0126_2_7_0_add_index_to_task_instance_table.py} |  4 +-
 airflow/models/taskmap.py                          |  1 +
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 docs/apache-airflow/img/airflow_erd.svg            |  8 +--
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 tests/models/test_taskinstance.py                  | 35 +++++++++++-
 7 files changed, 107 insertions(+), 9 deletions(-)

diff --git 
a/airflow/migrations/versions/0125_2_6_2_add_onupdate_cascade_to_taskmap.py 
b/airflow/migrations/versions/0125_2_6_2_add_onupdate_cascade_to_taskmap.py
new file mode 100644
index 0000000000..cdc21ddaa1
--- /dev/null
+++ b/airflow/migrations/versions/0125_2_6_2_add_onupdate_cascade_to_taskmap.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add ``onupdate`` cascade to ``task_map`` table
+
+Revision ID: c804e5c76e3e
+Revises: 98ae134e6fff
+Create Date: 2023-05-19 23:30:57.368617
+
+"""
+from __future__ import annotations
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "c804e5c76e3e"
+down_revision = "98ae134e6fff"
+branch_labels = None
+depends_on = None
+airflow_version = "2.6.2"
+
+
+def upgrade():
+    """Apply Add onupdate cascade to taskmap"""
+    with op.batch_alter_table("task_map") as batch_op:
+        batch_op.drop_constraint("task_map_task_instance_fkey", 
type_="foreignkey")
+        batch_op.create_foreign_key(
+            "task_map_task_instance_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+            onupdate="CASCADE",
+        )
+
+
+def downgrade():
+    """Unapply Add onupdate cascade to taskmap"""
+    with op.batch_alter_table("task_map") as batch_op:
+        batch_op.drop_constraint("task_map_task_instance_fkey", 
type_="foreignkey")
+        batch_op.create_foreign_key(
+            "task_map_task_instance_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
diff --git 
a/airflow/migrations/versions/0125_2_7_0_add_index_to_task_instance_table.py 
b/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
similarity index 96%
rename from 
airflow/migrations/versions/0125_2_7_0_add_index_to_task_instance_table.py
rename to 
airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
index b9a1df82fd..225776119e 100644
--- a/airflow/migrations/versions/0125_2_7_0_add_index_to_task_instance_table.py
+++ b/airflow/migrations/versions/0126_2_7_0_add_index_to_task_instance_table.py
@@ -19,7 +19,7 @@
 """Add index to task_instance table
 
 Revision ID: 937cbd173ca1
-Revises: 98ae134e6fff
+Revises: c804e5c76e3e
 Create Date: 2023-05-03 11:31:32.527362
 
 """
@@ -29,7 +29,7 @@ from alembic import op
 
 # revision identifiers, used by Alembic.
 revision = "937cbd173ca1"
-down_revision = "98ae134e6fff"
+down_revision = "c804e5c76e3e"
 branch_labels = None
 depends_on = None
 airflow_version = "2.7.0"
diff --git a/airflow/models/taskmap.py b/airflow/models/taskmap.py
index e7abcc1b6e..9704cfb5cc 100644
--- a/airflow/models/taskmap.py
+++ b/airflow/models/taskmap.py
@@ -72,6 +72,7 @@ class TaskMap(Base):
             ],
             name="task_map_task_instance_fkey",
             ondelete="CASCADE",
+            onupdate="CASCADE",
         ),
     )
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index 1f2c2c1f34..2405a400db 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
\ No newline at end of file
+2d0924c9f5c471214953113e8830b842fc45e9344ff6d67b46267cac99e2cdef
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg 
b/docs/apache-airflow/img/airflow_erd.svg
index 8439c226f8..142c05dfd8 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1225,28 +1225,28 @@
 <g id="edge48" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-488.65C1161.12,-465.83 1186.07,-443.06 1210,-422 1216.27,-416.48 
1222.72,-410.89 1229.27,-405.29"/>
-<text text-anchor="start" x="1219.27" y="-394.09" font-family="Times,serif" 
font-size="14.00">1</text>
+<text text-anchor="start" x="1198.27" y="-394.09" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="1137.01" y="-477.45" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge49" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-506.4C1161.12,-483.83 1186.07,-461.06 1210,-440 1216.27,-434.48 
1222.72,-428.89 1229.27,-423.28"/>
-<text text-anchor="start" x="1198.27" y="-412.08" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1219.27" y="-412.08" font-family="Times,serif" 
font-size="14.00">1</text>
 <text text-anchor="start" x="1137.01" y="-495.2" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge50" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-524.15C1161.12,-501.83 1186.07,-479.06 1210,-458 1216.66,-452.14 
1223.53,-446.19 1230.5,-440.21"/>
-<text text-anchor="start" x="1199.5" y="-444.01" font-family="Times,serif" 
font-size="14.00">0..N</text>
+<text text-anchor="start" x="1220.5" y="-444.01" font-family="Times,serif" 
font-size="14.00">1</text>
 <text text-anchor="start" x="1137.01" y="-512.95" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- task_instance&#45;&#45;xcom -->
 <g id="edge51" class="edge">
 <title>task_instance&#45;&#45;xcom</title>
 <path fill="none" stroke="#7f7f7f" stroke-dasharray="5,2" 
d="M1137.01,-541.9C1161.12,-519.83 1186.07,-497.06 1210,-476 1223.32,-464.28 
1237.48,-452.19 1251.63,-440.12"/>
-<text text-anchor="start" x="1241.63" y="-443.92" font-family="Times,serif" 
font-size="14.00">1</text>
+<text text-anchor="start" x="1251.63" y="-443.92" font-family="Times,serif" 
font-size="14.00">0..N</text>
 <text text-anchor="start" x="1137.01" y="-530.7" font-family="Times,serif" 
font-size="14.00">1</text>
 </g>
 <!-- log_template -->
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index 2fdc682025..7b09a46f8c 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | Revision ID                     | Revises ID        | Airflow Version   | 
Description                                                  |
 
+=================================+===================+===================+==============================================================+
-| ``937cbd173ca1`` (head)         | ``98ae134e6fff``  | ``2.7.0``         | 
Add index to task_instance table                             |
+| ``937cbd173ca1`` (head)         | ``c804e5c76e3e``  | ``2.7.0``         | 
Add index to task_instance table                             |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``c804e5c76e3e``                | ``98ae134e6fff``  | ``2.6.2``         | 
Add ``onupdate`` cascade to ``task_map`` table               |
 
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
 | ``98ae134e6fff``                | ``6abdffdd4815``  | ``2.6.0``         | 
Increase length of user identifier columns in ``ab_user``    |
 |                                 |                   |                   | 
and ``ab_register_user`` tables                              |
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0dcfb63214..0a20891896 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -70,7 +70,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
 from airflow.sensors.base import BaseSensorOperator
 from airflow.sensors.python import PythonSensor
-from airflow.serialization.serialized_objects import SerializedBaseOperator
+from airflow.serialization.serialized_objects import SerializedBaseOperator, 
SerializedDAG
 from airflow.settings import TIMEZONE
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
@@ -3643,6 +3643,39 @@ class TestTaskInstanceRecordTaskMapXComPush:
         assert task_map.length == expected_length
         assert task_map.keys == expected_keys
 
+    def test_no_error_on_changing_from_non_mapped_to_mapped(self, dag_maker, 
session):
+        """If a task changes from non-mapped to mapped, don't fail on 
integrity error."""
+        with 
dag_maker(dag_id="test_no_error_on_changing_from_non_mapped_to_mapped") as dag:
+
+            @dag.task()
+            def add_one(x):
+                return [x + 1]
+
+            @dag.task()
+            def add_two(x):
+                return x + 2
+
+            task1 = add_one(2)
+            add_two.expand(x=task1)
+
+        dr = dag_maker.create_dagrun()
+        ti = dr.get_task_instance(task_id="add_one")
+        ti.run()
+        assert ti.state == TaskInstanceState.SUCCESS
+        dag._remove_task("add_one")
+        with dag:
+            task1 = add_one.expand(x=[1, 2, 3]).operator
+        serialized_dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        dr.dag = serialized_dag
+        dr.verify_integrity(session=session)
+        ti = dr.get_task_instance(task_id="add_one")
+        assert ti.state == TaskInstanceState.REMOVED
+        dag.clear()
+        ti.refresh_from_task(task1)
+        # This should not raise an integrity error
+        dr.task_instance_scheduling_decisions()
+
 
 class TestMappedTaskInstanceReceiveValue:
     @pytest.mark.parametrize(

Reply via email to