This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 06da35cd0f Fix AIP-74 migration errors (#43313)
06da35cd0f is described below
commit 06da35cd0f2ea9bb23353714963b750747efd8ad
Author: Wei Lee <[email protected]>
AuthorDate: Thu Oct 24 17:33:05 2024 +0800
Fix AIP-74 migration errors (#43313)
* fix(migrations): fix typos in the dataset to asset migration files and
model
* fix(migrations): add server_default to the newly added "group" column in
the dataset table
without this change, db with existing dataset rows break due to
nullable=False
---
.../0035_3_0_0_add_name_field_to_dataset_model.py | 4 +++-
.../versions/0040_3_0_0_rename_dataset_as_asset.py | 20 ++++++++++----------
airflow/models/asset.py | 2 +-
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
4 files changed, 15 insertions(+), 13 deletions(-)
diff --git
a/airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py
b/airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py
index 2460b6956c..353dcbf0f8 100644
--- a/airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py
+++ b/airflow/migrations/versions/0035_3_0_0_add_name_field_to_dataset_model.py
@@ -61,7 +61,9 @@ def upgrade():
# Add 'name' column. Set it to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
- batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE,
default=str, nullable=False))
+ batch_op.add_column(
+ sa.Column("group", _STRING_COLUMN_TYPE, default=str,
server_default="", nullable=False)
+ )
# Fill name from uri column.
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
diff --git a/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py
b/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py
index ffe4fe7b83..e400fd5301 100644
--- a/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py
+++ b/airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py
@@ -136,7 +136,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
- constraint_name="asset_alias_asset_alias_id_fk_key",
+ constraint_name="asset_alias_asset_alias_id_fkey",
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
@@ -152,7 +152,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
- constraint_name="asset_alias_asset_asset_id_fk_key",
+ constraint_name="asset_alias_asset_asset_id_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
@@ -169,7 +169,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
- constraint_name=op.f("asset_alias_asset_event_asset_id_fkey"),
+ constraint_name=op.f("asset_alias_asset_event_alias_id_fkey"),
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
@@ -185,7 +185,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
- constraint_name=op.f("asset_alias_asset_event_event_id_fk_key"),
+ constraint_name=op.f("asset_alias_asset_event_event_id_fkey"),
referent_table="asset_event",
local_cols=["event_id"],
remote_cols=["id"],
@@ -199,7 +199,7 @@ def upgrade():
_rename_pk_constraint(
batch_op=batch_op,
original_name="dsdar_pkey",
- new_name="asaar_pkey",
+ new_name="dsaar_pkey",
columns=["alias_id", "dag_id"],
)
_rename_index(
@@ -218,7 +218,7 @@ def upgrade():
ondelete="CASCADE",
)
batch_op.create_foreign_key(
- constraint_name="dsaar_dag_id_fkey",
+ constraint_name="dsaar_dag_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
@@ -230,7 +230,7 @@ def upgrade():
with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as
batch_op:
batch_op.drop_constraint("dsdr_dag_id_fkey", type_="foreignkey")
- if op.get_bind().dialect.name in ("postgres", "mysql"):
+ if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("dsdr_dataset_fkey", type_="foreignkey")
_rename_pk_constraint(
@@ -266,7 +266,7 @@ def upgrade():
batch_op.alter_column("dataset_id", new_column_name="asset_id",
type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("todr_dag_id_fkey", type_="foreignkey")
- if op.get_bind().dialect.name in ("postgres", "mysql"):
+ if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("todr_dataset_fkey", type_="foreignkey")
_rename_pk_constraint(
@@ -303,7 +303,7 @@ def upgrade():
batch_op.alter_column("dataset_id", new_column_name="asset_id",
type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("ddrq_dag_fkey", type_="foreignkey")
- if op.get_bind().dialect.name in ("postgres", "mysql"):
+ if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("ddrq_dataset_fkey", type_="foreignkey")
_rename_pk_constraint(
@@ -499,7 +499,7 @@ def downgrade():
_rename_pk_constraint(
batch_op=batch_op,
- original_name="asaar_pkey",
+ original_name="dsaar_pkey",
new_name="dsdar_pkey",
columns=["alias_id", "dag_id"],
)
diff --git a/airflow/models/asset.py b/airflow/models/asset.py
index 2b810e3d0f..fc77cb7a31 100644
--- a/airflow/models/asset.py
+++ b/airflow/models/asset.py
@@ -295,7 +295,7 @@ class DagScheduleAssetAliasReference(Base):
__tablename__ = "dag_schedule_asset_alias_reference"
__table_args__ = (
- PrimaryKeyConstraint(alias_id, dag_id, name="asaar_pkey"),
+ PrimaryKeyConstraint(alias_id, dag_id, name="dsaar_pkey"),
ForeignKeyConstraint(
(alias_id,),
["asset_alias.id"],
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index c67b2d5abc..73c838e48a 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-fa87dd4d48e630a5e523157bebb8710fcc4cee9527b735ac67ec1274d5858bce
\ No newline at end of file
+bda4fb36d2ac0f34e60a9969b6c1c1e6a98b555b0fc6d0e7bfcee9a89fb95fbf
\ No newline at end of file