This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cafad9d7fbb0c57ddbe8c0f67f67ed8a7401457b Author: Ephraim Anierobi <[email protected]> AuthorDate: Mon Nov 7 14:03:10 2022 +0100 Fix sqlalchemy primary key black-out error on DDRQ (#27538) closes https://github.com/apache/airflow/issues/27509 (cherry picked from commit fc59b02cfac7fd691602edc92a7abac38ed51531) --- airflow/models/dataset.py | 1 + tests/models/test_taskinstance.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index b1a58e5442..c9cc3eca2a 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -116,6 +116,7 @@ class DagScheduleDatasetReference(Base): DagScheduleDatasetReference.dataset_id == foreign(DatasetDagRunQueue.dataset_id), DagScheduleDatasetReference.dag_id == foreign(DatasetDagRunQueue.target_dag_id), )""", + cascade="all, delete, delete-orphan", ) __tablename__ = "dag_schedule_dataset_reference" diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 4eff50092b..f4edebcb08 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1916,6 +1916,43 @@ class TestTaskInstance: # check that no dataset events were generated assert session.query(DatasetEvent).count() == 0 + def test_changing_of_dataset_when_ddrq_is_already_populated(self, dag_maker, session): + """ + Test that when a task that produces dataset has ran, that changing the consumer + dag dataset will not cause primary key blank-out + """ + from airflow import Dataset + + with dag_maker(schedule=None, serialized=True) as dag1: + + @task(outlets=Dataset("test/1")) + def test_task1(): + print(1) + + test_task1() + + dr1 = dag_maker.create_dagrun() + test_task1 = dag1.get_task("test_task1") + + with dag_maker(dag_id="testdag", schedule=[Dataset("test/1")], serialized=True): + + @task + def test_task2(): + print(1) + + test_task2() + + ti = dr1.get_task_instance(task_id="test_task1") + ti.run() + # Change the dataset. + with dag_maker(dag_id="testdag", schedule=[Dataset("test2/1")], serialized=True): + + @task + def test_task2(): + print(1) + + test_task2() + @staticmethod def _test_previous_dates_setup( schedule_interval: str | datetime.timedelta | None,
