This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7f20b1eed8 Extend get_datasets endpoint to include dataset aliases
(#40830)
7f20b1eed8 is described below
commit 7f20b1eed8e05e6c31e4632a721869841ca8412b
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jul 23 14:36:37 2024 +0800
Extend get_datasets endpoint to include dataset aliases (#40830)
---
airflow/api_connexion/schemas/dataset_schema.py | 14 ++++++++++++++
tests/api_connexion/endpoints/test_dataset_endpoint.py | 7 +++++--
tests/api_connexion/schemas/test_dataset_schema.py | 14 +++++++++++++-
tests/www/views/test_views_dataset.py | 2 +-
4 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/airflow/api_connexion/schemas/dataset_schema.py
b/airflow/api_connexion/schemas/dataset_schema.py
index fc80ccc672..b8aaf2f8fa 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -26,6 +26,7 @@ from airflow.api_connexion.schemas.common_schema import
JsonObjectField
from airflow.models.dagrun import DagRun
from airflow.models.dataset import (
DagScheduleDatasetReference,
+ DatasetAliasModel,
DatasetEvent,
DatasetModel,
TaskOutletDatasetReference,
@@ -59,6 +60,18 @@ class DagScheduleDatasetReferenceSchema(SQLAlchemySchema):
updated_at = auto_field()
+class DatasetAliasSchema(SQLAlchemySchema):
+ """DatasetAlias DB schema."""
+
+ class Meta:
+ """Meta."""
+
+ model = DatasetAliasModel
+
+ id = auto_field()
+ name = auto_field()
+
+
class DatasetSchema(SQLAlchemySchema):
"""Dataset DB schema."""
@@ -74,6 +87,7 @@ class DatasetSchema(SQLAlchemySchema):
updated_at = auto_field()
producing_tasks =
fields.List(fields.Nested(TaskOutletDatasetReferenceSchema))
consuming_dags =
fields.List(fields.Nested(DagScheduleDatasetReferenceSchema))
+ aliases = fields.List(fields.Nested(DatasetAliasSchema))
class DatasetCollection(NamedTuple):
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 5b6e2f2414..fa278a92d6 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -109,7 +109,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
self._create_dataset(session)
assert session.query(DatasetModel).count() == 1
- with assert_queries_count(5):
+ with assert_queries_count(6):
response = self.client.get(
f"/api/v1/datasets/{urllib.parse.quote('s3://bucket/key',
safe='')}",
environ_overrides={"REMOTE_USER": "test"},
@@ -123,6 +123,7 @@ class TestGetDatasetEndpoint(TestDatasetEndpoint):
"updated_at": self.default_time,
"consuming_dags": [],
"producing_tasks": [],
+ "aliases": [],
}
def test_should_respond_404(self):
@@ -176,7 +177,7 @@ class TestGetDatasets(TestDatasetEndpoint):
session.commit()
assert session.query(DatasetModel).count() == 2
- with assert_queries_count(8):
+ with assert_queries_count(10):
response = self.client.get("/api/v1/datasets",
environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 200
@@ -191,6 +192,7 @@ class TestGetDatasets(TestDatasetEndpoint):
"updated_at": self.default_time,
"consuming_dags": [],
"producing_tasks": [],
+ "aliases": [],
},
{
"id": 2,
@@ -200,6 +202,7 @@ class TestGetDatasets(TestDatasetEndpoint):
"updated_at": self.default_time,
"consuming_dags": [],
"producing_tasks": [],
+ "aliases": [],
},
],
"total_entries": 2,
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py
b/tests/api_connexion/schemas/test_dataset_schema.py
index 2a88dd9865..087fb8a840 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -28,7 +28,7 @@ from airflow.api_connexion.schemas.dataset_schema import (
dataset_schema,
)
from airflow.datasets import Dataset
-from airflow.models.dataset import DatasetEvent, DatasetModel
+from airflow.models.dataset import DatasetAliasModel, DatasetEvent,
DatasetModel
from airflow.operators.empty import EmptyOperator
from tests.test_utils.db import clear_db_dags, clear_db_datasets
@@ -87,6 +87,7 @@ class TestDatasetSchema(TestDatasetSchemaBase):
"updated_at": self.timestamp,
}
],
+ "aliases": [],
}
@@ -99,13 +100,19 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
)
for i in range(2)
]
+ dataset_aliases = [DatasetAliasModel(name=f"alias_{i}") for i in
range(2)]
+ for dataset_alias in dataset_aliases:
+ dataset_alias.datasets.append(datasets[0])
session.add_all(datasets)
+ session.add_all(dataset_aliases)
session.flush()
serialized_data = dataset_collection_schema.dump(
DatasetCollection(datasets=datasets, total_entries=2)
)
serialized_data["datasets"][0]["id"] = 1
serialized_data["datasets"][1]["id"] = 2
+ serialized_data["datasets"][0]["aliases"][0]["id"] = 1
+ serialized_data["datasets"][0]["aliases"][1]["id"] = 2
assert serialized_data == {
"datasets": [
{
@@ -116,6 +123,10 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
"updated_at": self.timestamp,
"consuming_dags": [],
"producing_tasks": [],
+ "aliases": [
+ {"id": 1, "name": "alias_0"},
+ {"id": 2, "name": "alias_1"},
+ ],
},
{
"id": 2,
@@ -125,6 +136,7 @@ class TestDatasetCollectionSchema(TestDatasetSchemaBase):
"updated_at": self.timestamp,
"consuming_dags": [],
"producing_tasks": [],
+ "aliases": [],
},
],
"total_entries": 2,
diff --git a/tests/www/views/test_views_dataset.py
b/tests/www/views/test_views_dataset.py
index d67ed80f38..797ed40ba0 100644
--- a/tests/www/views/test_views_dataset.py
+++ b/tests/www/views/test_views_dataset.py
@@ -51,7 +51,7 @@ class TestGetDatasets(TestDatasetEndpoint):
session.commit()
assert session.query(DatasetModel).count() == 2
- with assert_queries_count(8):
+ with assert_queries_count(10):
response = admin_client.get("/object/datasets_summary")
assert response.status_code == 200