This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 771d56b38b5 Change default asset alias group to 'asset' (#44778)
771d56b38b5 is described below
commit 771d56b38b5c37dde289c69254e30ac98e1c5ab9
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Dec 10 05:13:20 2024 +0800
Change default asset alias group to 'asset' (#44778)
And empty default group name is really unhelpful. I'm not entirely sure
if we should dump everything into 'asset' by default, or should aliases
be separated into their own default 'alias' group. We might have a
better idea when we have a real UI for this.
Also fixed migrations when providing a default to the new 'group'
column. Previously, migrations use 'default', which is client-side
and would not work correctly. A manual create-fill-alter 3-step
approach fixes this.
---
.../versions/0036_3_0_0_add_name_field_to_dataset_model.py | 12 +++++++-----
.../0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py | 9 ++++++++-
docs/apache-airflow/img/airflow_erd.sha256 | 2 +-
providers/tests/openlineage/plugins/test_utils.py | 2 +-
task_sdk/src/airflow/sdk/definitions/asset/__init__.py | 4 ++--
5 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
index 2676176692a..c3e8edfc3b9 100644
--- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
+++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py
@@ -58,20 +58,22 @@ def upgrade():
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
batch_op.drop_index("idx_name_unique")
batch_op.create_index("idx_dataset_alias_name_unique", ["name"],
unique=True)
- # Add 'name' column. Set it to nullable for now.
+ # Add 'name' and 'group' columns. Set them to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
- batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE,
default="", nullable=False))
- # Fill name from uri column.
+ batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE))
+ # Fill name from uri column, and group to 'asset'.
+ dataset_table = sa.table("dataset", sa.column("name"), sa.column("uri"),
sa.column("group"))
with Session(bind=op.get_bind()) as session:
- session.execute(sa.text("update dataset set name=uri"))
+
session.execute(sa.update(dataset_table).values(name=dataset_table.c.uri,
group="asset"))
session.commit()
- # Set the name column non-nullable.
+ # Set the name and group columns non-nullable.
# Now with values in there, we can create the new unique constraint and
index.
# Due to MySQL restrictions, we are also reducing the length on uri.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=_STRING_COLUMN_TYPE,
nullable=False)
batch_op.alter_column("uri", type_=_STRING_COLUMN_TYPE, nullable=False)
+ batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE,
default="asset", nullable=False)
batch_op.drop_index("idx_uri_unique")
batch_op.create_index("idx_dataset_name_uri_unique", ["name", "uri"],
unique=True)
diff --git
a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py
b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py
index f1b57974f05..25b4f187193 100644
---
a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py
+++
b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py
@@ -41,6 +41,7 @@ from __future__ import annotations
import sqlalchemy as sa
from alembic import op
+from sqlalchemy.orm import Session
# Revision identifiers, used by Alembic.
revision = "fb2d4922cd79"
@@ -59,7 +60,13 @@ def upgrade():
"""Tweak AssetAliasModel to match AssetModel."""
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
batch_op.alter_column("name", type_=_STRING_COLUMN_TYPE,
nullable=False)
- batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE,
default=str, nullable=False))
+ batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE))
+ dataset_alias_table = sa.table("dataset_alias", sa.column("group"))
+ with Session(bind=op.get_bind()) as session:
+ session.execute(sa.update(dataset_alias_table).values(group="asset"))
+ session.commit()
+ with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
+ batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE,
default="asset", nullable=False)
def downgrade():
diff --git a/docs/apache-airflow/img/airflow_erd.sha256
b/docs/apache-airflow/img/airflow_erd.sha256
index 4d679e27175..e1bb8bb1931 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-805d79090c38fabc03b704d77a093094758349a13659a334262d5a7afc2e7e45
\ No newline at end of file
+95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d
\ No newline at end of file
diff --git a/providers/tests/openlineage/plugins/test_utils.py
b/providers/tests/openlineage/plugins/test_utils.py
index 475624fef8f..8f786cb4c4b 100644
--- a/providers/tests/openlineage/plugins/test_utils.py
+++ b/providers/tests/openlineage/plugins/test_utils.py
@@ -370,7 +370,7 @@ def test_serialize_timetable():
{
"__type": DagAttributeTypes.ASSET_ALIAS,
"name": "another",
- "group": "",
+ "group": "asset",
},
{
"__type": DagAttributeTypes.ASSET,
diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
index 7ea61e905f5..9f5b85ccb16 100644
--- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
+++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -391,10 +391,10 @@ class Model(Asset):
@attrs.define(unsafe_hash=False)
class AssetAlias(BaseAsset):
- """A represeation of asset alias which is used to create asset during the
runtime."""
+ """A representation of asset alias which is used to create asset during
the runtime."""
name: str = attrs.field(validator=_validate_non_empty_identifier)
- group: str = attrs.field(kw_only=True, default="",
validator=_validate_identifier)
+ group: str = attrs.field(kw_only=True, default="asset",
validator=_validate_identifier)
def _resolve_assets(self) -> list[Asset]:
from airflow.models.asset import expand_alias_to_assets