This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 313be641ed2 Redact extra fields for Asset Endpoints in fastAPI (#44069)
313be641ed2 is described below
commit 313be641ed260cfcdbe436cdfe2d843128abad26
Author: vatsrahul1001 <[email protected]>
AuthorDate: Mon Nov 18 19:32:30 2024 +0530
Redact extra fields for Asset Endpoints in fastAPI (#44069)
* redact extra fields in asset endpoints for fast api
* redact extra fields in asset endpoints for fast api
* updating test name correctly
* removing duplicated time_freezer
---
airflow/api_fastapi/core_api/datamodels/assets.py | 12 ++
.../core_api/routes/public/test_assets.py | 145 +++++++++++++++++++++
2 files changed, 157 insertions(+)
diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py
b/airflow/api_fastapi/core_api/datamodels/assets.py
index bfdbb2d7fc8..94ec17ad63d 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -21,6 +21,8 @@ from datetime import datetime
from pydantic import BaseModel, Field, field_validator
+from airflow.utils.log.secrets_masker import redact
+
class DagScheduleAssetReference(BaseModel):
"""DAG schedule reference serializer for assets."""
@@ -58,6 +60,11 @@ class AssetResponse(BaseModel):
producing_tasks: list[TaskOutletAssetReference]
aliases: list[AssetAliasSchema]
+ @field_validator("extra", mode="after")
+ @classmethod
+ def redact_extra(cls, v: dict):
+ return redact(v)
+
class AssetCollectionResponse(BaseModel):
"""Asset collection response."""
@@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel):
created_dagruns: list[DagRunAssetReference]
timestamp: datetime
+ @field_validator("extra", mode="after")
+ @classmethod
+ def redact_extra(cls, v: dict):
+ return redact(v)
+
class AssetEventCollectionResponse(BaseModel):
"""Asset event collection response."""
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 4a3e21035c3..9f494f53c80 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -59,6 +59,22 @@ def _create_assets(session, num: int = 2) -> None:
session.commit()
+def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
+ default_time = "2020-06-11T18:00:00+00:00"
+ assets = [
+ AssetModel(
+ id=i,
+ uri=f"s3://bucket/key/{i}",
+ extra={"password": "bar"},
+ created_at=timezone.parse(default_time),
+ updated_at=timezone.parse(default_time),
+ )
+ for i in range(1, 1 + num)
+ ]
+ session.add_all(assets)
+ session.commit()
+
+
def _create_provided_asset(session, asset: AssetModel) -> None:
session.add(asset)
session.commit()
@@ -82,6 +98,24 @@ def _create_assets_events(session, num: int = 2) -> None:
session.commit()
+def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None:
+ default_time = "2020-06-11T18:00:00+00:00"
+ assets_events = [
+ AssetEvent(
+ id=i,
+ asset_id=i,
+ extra={"password": "bar"},
+ source_task_id="source_task_id",
+ source_dag_id="source_dag_id",
+ source_run_id=f"source_run_id_{i}",
+ timestamp=timezone.parse(default_time),
+ )
+ for i in range(1, 1 + num)
+ ]
+ session.add_all(assets_events)
+ session.commit()
+
+
def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
session.add(asset_event)
session.commit()
@@ -142,6 +176,10 @@ class TestAssets:
def create_assets(self, session, num: int = 2):
_create_assets(session=session, num=num)
+ @provide_session
+ def create_assets_with_sensitive_extra(self, session, num: int = 2):
+ _create_assets_with_sensitive_extra(session=session, num=num)
+
@provide_session
def create_provided_asset(self, session, asset: AssetModel):
_create_provided_asset(session=session, asset=asset)
@@ -150,6 +188,10 @@ class TestAssets:
def create_assets_events(self, session, num: int = 2):
_create_assets_events(session=session, num=num)
+ @provide_session
+ def create_assets_events_with_sensitive_extra(self, session, num: int = 2):
+ _create_assets_events_with_sensitive_extra(session=session, num=num)
+
@provide_session
def create_provided_asset_event(self, session, asset_event: AssetEvent):
_create_provided_asset_event(session=session, asset_event=asset_event)
@@ -439,6 +481,68 @@ class TestGetAssetEvents(TestAssets):
asset_uris = [asset["uri"] for asset in
response.json()["asset_events"]]
assert asset_uris == expected_asset_uris
+ @pytest.mark.usefixtures("time_freezer")
+ @pytest.mark.enable_redact
+ def test_should_mask_sensitive_extra(self, test_client, session):
+ self.create_assets_with_sensitive_extra()
+ self.create_assets_events_with_sensitive_extra()
+ self.create_dag_run()
+ self.create_asset_dag_run()
+ response = test_client.get("/public/assets/events")
+ assert response.status_code == 200
+ response_data = response.json()
+ assert response_data == {
+ "asset_events": [
+ {
+ "id": 1,
+ "asset_id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"password": "***"},
+ "source_task_id": "source_task_id",
+ "source_dag_id": "source_dag_id",
+ "source_run_id": "source_run_id_1",
+ "source_map_index": -1,
+ "created_dagruns": [
+ {
+ "run_id": "source_run_id_1",
+ "dag_id": "source_dag_id",
+ "logical_date": "2020-06-11T18:00:00Z",
+ "start_date": "2020-06-11T18:00:00Z",
+ "end_date": "2020-06-11T18:00:00Z",
+ "state": "success",
+ "data_interval_start": "2020-06-11T18:00:00Z",
+ "data_interval_end": "2020-06-11T18:00:00Z",
+ }
+ ],
+ "timestamp": "2020-06-11T18:00:00Z",
+ },
+ {
+ "id": 2,
+ "asset_id": 2,
+ "uri": "s3://bucket/key/2",
+ "extra": {"password": "***"},
+ "source_task_id": "source_task_id",
+ "source_dag_id": "source_dag_id",
+ "source_run_id": "source_run_id_2",
+ "source_map_index": -1,
+ "created_dagruns": [
+ {
+ "run_id": "source_run_id_2",
+ "dag_id": "source_dag_id",
+ "logical_date": "2020-06-11T18:00:00Z",
+ "start_date": "2020-06-11T18:00:00Z",
+ "end_date": "2020-06-11T18:00:00Z",
+ "state": "success",
+ "data_interval_start": "2020-06-11T18:00:00Z",
+ "data_interval_end": "2020-06-11T18:00:00Z",
+ }
+ ],
+ "timestamp": "2020-06-11T18:00:00Z",
+ },
+ ],
+ "total_entries": 2,
+ }
+
class TestGetAssetEndpoint(TestAssets):
@pytest.mark.parametrize(
@@ -478,6 +582,27 @@ class TestGetAssetEndpoint(TestAssets):
assert response.status_code == 404
assert response.json()["detail"] == "The Asset with uri:
`s3://bucket/key` was not found"
+ @pytest.mark.usefixtures("time_freezer")
+ @pytest.mark.enable_redact
+ def test_should_mask_sensitive_extra(self, test_client, session):
+ self.create_assets_with_sensitive_extra()
+ tz_datetime_format = self.default_time.replace("+00:00", "Z")
+ uri = "s3://bucket/key/1"
+ response = test_client.get(
+ f"/public/assets/{uri}",
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"password": "***"},
+ "created_at": tz_datetime_format,
+ "updated_at": tz_datetime_format,
+ "consuming_dags": [],
+ "producing_tasks": [],
+ "aliases": [],
+ }
+
class TestQueuedEventEndpoint(TestAssets):
def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
@@ -593,3 +718,23 @@ class TestPostAssetEvents(TestAssets):
response = test_client.post("/public/assets/events",
json=event_invalid_payload)
assert response.status_code == 422
+
+ @pytest.mark.usefixtures("time_freezer")
+ @pytest.mark.enable_redact
+ def test_should_mask_sensitive_extra(self, test_client, session):
+ self.create_assets()
+ event_payload = {"uri": "s3://bucket/key/1", "extra": {"password":
"bar"}}
+ response = test_client.post("/public/assets/events",
json=event_payload)
+ assert response.status_code == 200
+ assert response.json() == {
+ "id": mock.ANY,
+ "asset_id": 1,
+ "uri": "s3://bucket/key/1",
+ "extra": {"password": "***", "from_rest_api": True},
+ "source_task_id": None,
+ "source_dag_id": None,
+ "source_run_id": None,
+ "source_map_index": -1,
+ "created_dagruns": [],
+ "timestamp": self.default_time.replace("+00:00", "Z"),
+ }