This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e84e7a53564 feat(migraitons): remove dataset with duplicate uri when
downgrading (#48501)
e84e7a53564 is described below
commit e84e7a5356468ac61d7e143e1211f154e33f936f
Author: Wei Lee <[email protected]>
AuthorDate: Sun Mar 30 07:39:17 2025 +0800
feat(migraitons): remove dataset with duplicate uri when downgrading
(#48501)
close: #48462
---
airflow-core/docs/img/airflow_erd.sha256 | 2 +-
.../0036_3_0_0_add_name_field_to_dataset_model.py | 16 +++++++++++++++-
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/airflow-core/docs/img/airflow_erd.sha256
b/airflow-core/docs/img/airflow_erd.sha256
index b3e35be2082..cf4c92fd2e8 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-0a402902bfd550af6278c539ca5ceed8c29a48605ae29124bf01f1007827a8a0
\ No newline at end of file
+c87c97a9796cf19bdb539fa153248640f86c5db94a9f08192da1ed8e023f71e6
\ No newline at end of file
diff --git
a/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
b/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
index 221515ce20a..c7112f91b3c 100644
---
a/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
+++
b/airflow-core/src/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
@@ -80,7 +80,7 @@ def upgrade():
def downgrade():
- # Remove orphaned datasets if there is an active one that shares the sanem
URI.
+ # Remove orphaned datasets if there is an active one that shares the same
URI.
# We will to create a unique constraint on URI and can't fail with a
duplicate.
# This drops history, which is unfortunate, but reasonable for downgrade.
if op.get_bind().dialect.name == "mysql":
@@ -104,6 +104,20 @@ def downgrade():
"delete from dataset as d1 where d1.is_orphaned = true "
"and exists (select 1 from dataset as d2 where d1.uri = d2.uri and
d2.is_orphaned = false)"
)
+
+ # Keep only the datasets with min id if multiple orphaned datasets with
the same uri exist.
+ # This usually happens when all the dags are turned off.
+ op.execute(
+ """
+ with unique_dataset as (select min(id) as min_id, uri as uri from
dataset group by id),
+ duplicate_dataset_id as (
+ select id from dataset join unique_dataset
+ on dataset.uri = unique_dataset.uri
+ where dataset.id > unique_dataset.min_id
+ )
+ delete from dataset where id in (select * from duplicate_dataset_id)
+ """
+ )
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.drop_index("idx_dataset_name_uri_unique")
batch_op.create_index("idx_uri_unique", ["uri"], unique=True)