This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cafad9d7fbb0c57ddbe8c0f67f67ed8a7401457b
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Nov 7 14:03:10 2022 +0100

    Fix sqlalchemy primary key black-out error on DDRQ (#27538)
    
    closes https://github.com/apache/airflow/issues/27509
    
    (cherry picked from commit fc59b02cfac7fd691602edc92a7abac38ed51531)
---
 airflow/models/dataset.py         |  1 +
 tests/models/test_taskinstance.py | 37 +++++++++++++++++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index b1a58e5442..c9cc3eca2a 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -116,6 +116,7 @@ class DagScheduleDatasetReference(Base):
             DagScheduleDatasetReference.dataset_id == 
foreign(DatasetDagRunQueue.dataset_id),
             DagScheduleDatasetReference.dag_id == 
foreign(DatasetDagRunQueue.target_dag_id),
         )""",
+        cascade="all, delete, delete-orphan",
     )
 
     __tablename__ = "dag_schedule_dataset_reference"
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 4eff50092b..f4edebcb08 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1916,6 +1916,43 @@ class TestTaskInstance:
         # check that no dataset events were generated
         assert session.query(DatasetEvent).count() == 0
 
+    def test_changing_of_dataset_when_ddrq_is_already_populated(self, 
dag_maker, session):
+        """
+        Test that when a task that produces dataset has ran, that changing the 
consumer
+        dag dataset will not cause primary key blank-out
+        """
+        from airflow import Dataset
+
+        with dag_maker(schedule=None, serialized=True) as dag1:
+
+            @task(outlets=Dataset("test/1"))
+            def test_task1():
+                print(1)
+
+            test_task1()
+
+        dr1 = dag_maker.create_dagrun()
+        test_task1 = dag1.get_task("test_task1")
+
+        with dag_maker(dag_id="testdag", schedule=[Dataset("test/1")], 
serialized=True):
+
+            @task
+            def test_task2():
+                print(1)
+
+            test_task2()
+
+        ti = dr1.get_task_instance(task_id="test_task1")
+        ti.run()
+        # Change the dataset.
+        with dag_maker(dag_id="testdag", schedule=[Dataset("test2/1")], 
serialized=True):
+
+            @task
+            def test_task2():
+                print(1)
+
+            test_task2()
+
     @staticmethod
     def _test_previous_dates_setup(
         schedule_interval: str | datetime.timedelta | None,

Reply via email to