This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 313be641ed2 Redact extra fields for Asset Endpoints in fastAPI (#44069)
313be641ed2 is described below

commit 313be641ed260cfcdbe436cdfe2d843128abad26
Author: vatsrahul1001 <[email protected]>
AuthorDate: Mon Nov 18 19:32:30 2024 +0530

    Redact extra fields for Asset Endpoints in fastAPI (#44069)
    
    * redact extra fields in asset endpoints for fast api
    
    * redact extra fields in asset endpoints for fast api
    
    * updating test name correctly
    
    * removing duplicated time_freezer
---
 airflow/api_fastapi/core_api/datamodels/assets.py  |  12 ++
 .../core_api/routes/public/test_assets.py          | 145 +++++++++++++++++++++
 2 files changed, 157 insertions(+)

diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py 
b/airflow/api_fastapi/core_api/datamodels/assets.py
index bfdbb2d7fc8..94ec17ad63d 100644
--- a/airflow/api_fastapi/core_api/datamodels/assets.py
+++ b/airflow/api_fastapi/core_api/datamodels/assets.py
@@ -21,6 +21,8 @@ from datetime import datetime
 
 from pydantic import BaseModel, Field, field_validator
 
+from airflow.utils.log.secrets_masker import redact
+
 
 class DagScheduleAssetReference(BaseModel):
     """DAG schedule reference serializer for assets."""
@@ -58,6 +60,11 @@ class AssetResponse(BaseModel):
     producing_tasks: list[TaskOutletAssetReference]
     aliases: list[AssetAliasSchema]
 
+    @field_validator("extra", mode="after")
+    @classmethod
+    def redact_extra(cls, v: dict):
+        return redact(v)
+
 
 class AssetCollectionResponse(BaseModel):
     """Asset collection response."""
@@ -93,6 +100,11 @@ class AssetEventResponse(BaseModel):
     created_dagruns: list[DagRunAssetReference]
     timestamp: datetime
 
+    @field_validator("extra", mode="after")
+    @classmethod
+    def redact_extra(cls, v: dict):
+        return redact(v)
+
 
 class AssetEventCollectionResponse(BaseModel):
     """Asset event collection response."""
diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py 
b/tests/api_fastapi/core_api/routes/public/test_assets.py
index 4a3e21035c3..9f494f53c80 100644
--- a/tests/api_fastapi/core_api/routes/public/test_assets.py
+++ b/tests/api_fastapi/core_api/routes/public/test_assets.py
@@ -59,6 +59,22 @@ def _create_assets(session, num: int = 2) -> None:
     session.commit()
 
 
+def _create_assets_with_sensitive_extra(session, num: int = 2) -> None:
+    default_time = "2020-06-11T18:00:00+00:00"
+    assets = [
+        AssetModel(
+            id=i,
+            uri=f"s3://bucket/key/{i}",
+            extra={"password": "bar"},
+            created_at=timezone.parse(default_time),
+            updated_at=timezone.parse(default_time),
+        )
+        for i in range(1, 1 + num)
+    ]
+    session.add_all(assets)
+    session.commit()
+
+
 def _create_provided_asset(session, asset: AssetModel) -> None:
     session.add(asset)
     session.commit()
@@ -82,6 +98,24 @@ def _create_assets_events(session, num: int = 2) -> None:
     session.commit()
 
 
+def _create_assets_events_with_sensitive_extra(session, num: int = 2) -> None:
+    default_time = "2020-06-11T18:00:00+00:00"
+    assets_events = [
+        AssetEvent(
+            id=i,
+            asset_id=i,
+            extra={"password": "bar"},
+            source_task_id="source_task_id",
+            source_dag_id="source_dag_id",
+            source_run_id=f"source_run_id_{i}",
+            timestamp=timezone.parse(default_time),
+        )
+        for i in range(1, 1 + num)
+    ]
+    session.add_all(assets_events)
+    session.commit()
+
+
 def _create_provided_asset_event(session, asset_event: AssetEvent) -> None:
     session.add(asset_event)
     session.commit()
@@ -142,6 +176,10 @@ class TestAssets:
     def create_assets(self, session, num: int = 2):
         _create_assets(session=session, num=num)
 
+    @provide_session
+    def create_assets_with_sensitive_extra(self, session, num: int = 2):
+        _create_assets_with_sensitive_extra(session=session, num=num)
+
     @provide_session
     def create_provided_asset(self, session, asset: AssetModel):
         _create_provided_asset(session=session, asset=asset)
@@ -150,6 +188,10 @@ class TestAssets:
     def create_assets_events(self, session, num: int = 2):
         _create_assets_events(session=session, num=num)
 
+    @provide_session
+    def create_assets_events_with_sensitive_extra(self, session, num: int = 2):
+        _create_assets_events_with_sensitive_extra(session=session, num=num)
+
     @provide_session
     def create_provided_asset_event(self, session, asset_event: AssetEvent):
         _create_provided_asset_event(session=session, asset_event=asset_event)
@@ -439,6 +481,68 @@ class TestGetAssetEvents(TestAssets):
         asset_uris = [asset["uri"] for asset in 
response.json()["asset_events"]]
         assert asset_uris == expected_asset_uris
 
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.enable_redact
+    def test_should_mask_sensitive_extra(self, test_client, session):
+        self.create_assets_with_sensitive_extra()
+        self.create_assets_events_with_sensitive_extra()
+        self.create_dag_run()
+        self.create_asset_dag_run()
+        response = test_client.get("/public/assets/events")
+        assert response.status_code == 200
+        response_data = response.json()
+        assert response_data == {
+            "asset_events": [
+                {
+                    "id": 1,
+                    "asset_id": 1,
+                    "uri": "s3://bucket/key/1",
+                    "extra": {"password": "***"},
+                    "source_task_id": "source_task_id",
+                    "source_dag_id": "source_dag_id",
+                    "source_run_id": "source_run_id_1",
+                    "source_map_index": -1,
+                    "created_dagruns": [
+                        {
+                            "run_id": "source_run_id_1",
+                            "dag_id": "source_dag_id",
+                            "logical_date": "2020-06-11T18:00:00Z",
+                            "start_date": "2020-06-11T18:00:00Z",
+                            "end_date": "2020-06-11T18:00:00Z",
+                            "state": "success",
+                            "data_interval_start": "2020-06-11T18:00:00Z",
+                            "data_interval_end": "2020-06-11T18:00:00Z",
+                        }
+                    ],
+                    "timestamp": "2020-06-11T18:00:00Z",
+                },
+                {
+                    "id": 2,
+                    "asset_id": 2,
+                    "uri": "s3://bucket/key/2",
+                    "extra": {"password": "***"},
+                    "source_task_id": "source_task_id",
+                    "source_dag_id": "source_dag_id",
+                    "source_run_id": "source_run_id_2",
+                    "source_map_index": -1,
+                    "created_dagruns": [
+                        {
+                            "run_id": "source_run_id_2",
+                            "dag_id": "source_dag_id",
+                            "logical_date": "2020-06-11T18:00:00Z",
+                            "start_date": "2020-06-11T18:00:00Z",
+                            "end_date": "2020-06-11T18:00:00Z",
+                            "state": "success",
+                            "data_interval_start": "2020-06-11T18:00:00Z",
+                            "data_interval_end": "2020-06-11T18:00:00Z",
+                        }
+                    ],
+                    "timestamp": "2020-06-11T18:00:00Z",
+                },
+            ],
+            "total_entries": 2,
+        }
+
 
 class TestGetAssetEndpoint(TestAssets):
     @pytest.mark.parametrize(
@@ -478,6 +582,27 @@ class TestGetAssetEndpoint(TestAssets):
         assert response.status_code == 404
         assert response.json()["detail"] == "The Asset with uri: 
`s3://bucket/key` was not found"
 
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.enable_redact
+    def test_should_mask_sensitive_extra(self, test_client, session):
+        self.create_assets_with_sensitive_extra()
+        tz_datetime_format = self.default_time.replace("+00:00", "Z")
+        uri = "s3://bucket/key/1"
+        response = test_client.get(
+            f"/public/assets/{uri}",
+        )
+        assert response.status_code == 200
+        assert response.json() == {
+            "id": 1,
+            "uri": "s3://bucket/key/1",
+            "extra": {"password": "***"},
+            "created_at": tz_datetime_format,
+            "updated_at": tz_datetime_format,
+            "consuming_dags": [],
+            "producing_tasks": [],
+            "aliases": [],
+        }
+
 
 class TestQueuedEventEndpoint(TestAssets):
     def _create_asset_dag_run_queues(self, dag_id, asset_id, session):
@@ -593,3 +718,23 @@ class TestPostAssetEvents(TestAssets):
         response = test_client.post("/public/assets/events", 
json=event_invalid_payload)
 
         assert response.status_code == 422
+
+    @pytest.mark.usefixtures("time_freezer")
+    @pytest.mark.enable_redact
+    def test_should_mask_sensitive_extra(self, test_client, session):
+        self.create_assets()
+        event_payload = {"uri": "s3://bucket/key/1", "extra": {"password": 
"bar"}}
+        response = test_client.post("/public/assets/events", 
json=event_payload)
+        assert response.status_code == 200
+        assert response.json() == {
+            "id": mock.ANY,
+            "asset_id": 1,
+            "uri": "s3://bucket/key/1",
+            "extra": {"password": "***", "from_rest_api": True},
+            "source_task_id": None,
+            "source_dag_id": None,
+            "source_run_id": None,
+            "source_map_index": -1,
+            "created_dagruns": [],
+            "timestamp": self.default_time.replace("+00:00", "Z"),
+        }

Reply via email to