This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e84e7a53564 feat(migraitons): remove dataset with duplicate uri when 
downgrading (#48501)
e84e7a53564 is described below

commit e84e7a5356468ac61d7e143e1211f154e33f936f
Author: Wei Lee <[email protected]>
AuthorDate: Sun Mar 30 07:39:17 2025 +0800

    feat(migraitons): remove dataset with duplicate uri when downgrading 
(#48501)
    
    close: #48462
---
 airflow-core/docs/img/airflow_erd.sha256                 |  2 +-
 .../0036_3_0_0_add_name_field_to_dataset_model.py        | 16 +++++++++++++++-
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/airflow-core/docs/img/airflow_erd.sha256 
b/airflow-core/docs/img/airflow_erd.sha256
index b3e35be2082..cf4c92fd2e8 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-0a402902bfd550af6278c539ca5ceed8c29a48605ae29124bf01f1007827a8a0
\ No newline at end of file
+c87c97a9796cf19bdb539fa153248640f86c5db94a9f08192da1ed8e023f71e6
\ No newline at end of file
diff --git 
a/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
 
b/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
index 221515ce20a..c7112f91b3c 100644
--- 
a/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
+++ 
b/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
@@ -80,7 +80,7 @@ def upgrade():
 
 
 def downgrade():
-    # Remove orphaned datasets if there is an active one that shares the sanem 
URI.
+    # Remove orphaned datasets if there is an active one that shares the same 
URI.
     # We will to create a unique constraint on URI and can't fail with a 
duplicate.
     # This drops history, which is unfortunate, but reasonable for downgrade.
     if op.get_bind().dialect.name == "mysql":
@@ -104,6 +104,20 @@ def downgrade():
             "delete from dataset as d1 where d1.is_orphaned = true "
             "and exists (select 1 from dataset as d2 where d1.uri = d2.uri and 
d2.is_orphaned = false)"
         )
+
+    # Keep only the datasets with min id if multiple orphaned datasets with 
the same uri exist.
+    # This usually happens when all the dags are turned off.
+    op.execute(
+        """
+        with unique_dataset as (select min(id) as min_id, uri as uri from 
dataset group by id),
+        duplicate_dataset_id as (
+            select id from dataset join unique_dataset
+            on dataset.uri = unique_dataset.uri
+            where dataset.id > unique_dataset.min_id
+        )
+        delete from dataset where id in (select * from duplicate_dataset_id)
+        """
+    )
     with op.batch_alter_table("dataset", schema=None) as batch_op:
         batch_op.drop_index("idx_dataset_name_uri_unique")
         batch_op.create_index("idx_uri_unique", ["uri"], unique=True)

Reply via email to