This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b17ff3aa3 Add post endpoint for dataset events (#37570)
9b17ff3aa3 is described below

commit 9b17ff3aa309ba07ef94238ad3465f074df1840a
Author: Eduardo Nicastro <[email protected]>
AuthorDate: Mon Feb 26 04:01:52 2024 -0300

    Add post endpoint for dataset events (#37570)
    
    Co-authored-by: Wei Lee <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../api_connexion/endpoints/dataset_endpoint.py    | 49 +++++++++++++++++-
 airflow/api_connexion/openapi/v1.yaml              | 59 ++++++++++++++++++----
 airflow/api_connexion/schemas/dataset_schema.py    |  8 +++
 airflow/datasets/manager.py                        | 37 +++++++++-----
 .../fab/auth_manager/security_manager/override.py  |  2 +
 airflow/www/static/js/components/Table/Cells.tsx   |  3 ++
 airflow/www/static/js/types/api-generated.ts       | 59 +++++++++++++---------
 .../endpoints/test_dataset_endpoint.py             | 49 ++++++++++++++++++
 tests/api_connexion/schemas/test_dataset_schema.py | 31 ++++++++++++
 9 files changed, 250 insertions(+), 47 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py 
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index bf83c857d7..91c41264a5 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -20,11 +20,13 @@ from http import HTTPStatus
 from typing import TYPE_CHECKING
 
 from connexion import NoContent
+from marshmallow import ValidationError
 from sqlalchemy import delete, func, select
 from sqlalchemy.orm import joinedload, subqueryload
 
 from airflow.api_connexion import security
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
+from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.parameters import apply_sorting, check_limit, 
format_datetime, format_parameters
 from airflow.api_connexion.schemas.dataset_schema import (
     DagScheduleDatasetReference,
@@ -33,15 +35,23 @@ from airflow.api_connexion.schemas.dataset_schema import (
     QueuedEvent,
     QueuedEventCollection,
     TaskOutletDatasetReference,
+    create_dataset_event_schema,
     dataset_collection_schema,
     dataset_event_collection_schema,
+    dataset_event_schema,
     dataset_schema,
     queued_event_collection_schema,
     queued_event_schema,
 )
+from airflow.datasets import Dataset
+from airflow.datasets.manager import dataset_manager
 from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, 
DatasetModel
+from airflow.security import permissions
+from airflow.utils import timezone
 from airflow.utils.db import get_query_count
+from airflow.utils.log.action_logger import action_event_from_permission
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.www.decorators import action_logging
 from airflow.www.extensions.init_auth_manager import get_auth_manager
 
 if TYPE_CHECKING:
@@ -49,6 +59,8 @@ if TYPE_CHECKING:
 
     from airflow.api_connexion.types import APIResponse
 
+RESOURCE_EVENT_PREFIX = "dataset"
+
 
 @security.requires_access_dataset("GET")
 @provide_session
@@ -311,3 +323,38 @@ def delete_dataset_queued_events(
         "Queue event not found",
         detail=f"Queue event with dataset uri: `{uri}` was not found",
     )
+
+
[email protected]_access_dataset("POST")
+@provide_session
+@action_logging(
+    event=action_event_from_permission(
+        prefix=RESOURCE_EVENT_PREFIX,
+        permission=permissions.ACTION_CAN_CREATE,
+    ),
+)
+def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+    """Create dataset event."""
+    body = get_json_request_dict()
+    try:
+        json_body = create_dataset_event_schema.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+
+    uri = json_body["dataset_uri"]
+    dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri == 
uri).limit(1))
+    if not dataset:
+        raise NotFound(title="Dataset not found", detail=f"Dataset with uri: 
'{uri}' not found")
+    timestamp = timezone.utcnow()
+    extra = json_body.get("extra", {})
+    extra["from_rest_api"] = True
+    dataset_event = dataset_manager.register_dataset_change(
+        dataset=Dataset(uri),
+        timestamp=timestamp,
+        extra=extra,
+        session=session,
+    )
+    if not dataset_event:
+        raise NotFound(title="Dataset not found", detail=f"Dataset with uri: 
'{uri}' not found")
+    event = dataset_event_schema.dump(dataset_event)
+    return event
diff --git a/airflow/api_connexion/openapi/v1.yaml 
b/airflow/api_connexion/openapi/v1.yaml
index d2ccf87d07..db79d9b187 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2129,21 +2129,21 @@ paths:
           $ref: "#/components/responses/NotFound"
 
   /datasets/events:
-    parameters:
-      - $ref: "#/components/parameters/PageLimit"
-      - $ref: "#/components/parameters/PageOffset"
-      - $ref: "#/components/parameters/OrderBy"
-      - $ref: "#/components/parameters/FilterDatasetID"
-      - $ref: "#/components/parameters/FilterSourceDAGID"
-      - $ref: "#/components/parameters/FilterSourceTaskID"
-      - $ref: "#/components/parameters/FilterSourceRunID"
-      - $ref: "#/components/parameters/FilterSourceMapIndex"
     get:
       summary: Get dataset events
       description: Get dataset events
       x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
       operationId: get_dataset_events
       tags: [Dataset]
+      parameters:
+        - $ref: "#/components/parameters/PageLimit"
+        - $ref: "#/components/parameters/PageOffset"
+        - $ref: "#/components/parameters/OrderBy"
+        - $ref: "#/components/parameters/FilterDatasetID"
+        - $ref: "#/components/parameters/FilterSourceDAGID"
+        - $ref: "#/components/parameters/FilterSourceTaskID"
+        - $ref: "#/components/parameters/FilterSourceRunID"
+        - $ref: "#/components/parameters/FilterSourceMapIndex"
       responses:
         "200":
           description: Success.
@@ -2157,6 +2157,33 @@ paths:
           $ref: "#/components/responses/PermissionDenied"
         "404":
           $ref: "#/components/responses/NotFound"
+    post:
+      summary: Create dataset event
+      description: Create dataset event
+      x-openapi-router-controller: 
airflow.api_connexion.endpoints.dataset_endpoint
+      operationId: create_dataset_event
+      tags: [Dataset]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateDatasetEvent'
+      responses:
+        '200':
+          description: Success.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DatasetEvent'
+        "400":
+          $ref: "#/components/responses/BadRequest"
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
+        '403':
+          $ref: '#/components/responses/PermissionDenied'
+        '404':
+          $ref: '#/components/responses/NotFound'
 
   /config:
     get:
@@ -4290,6 +4317,20 @@ components:
           description: The dataset event creation time
           nullable: false
 
+    CreateDatasetEvent:
+      type: object
+      required:
+        - dataset_uri
+      properties:
+        dataset_uri:
+          type: string
+          description: The URI of the dataset
+          nullable: false
+        extra:
+          type: object
+          description: The dataset event extra
+          nullable: true
+
     QueuedEvent:
       type: object
       properties:
diff --git a/airflow/api_connexion/schemas/dataset_schema.py 
b/airflow/api_connexion/schemas/dataset_schema.py
index a7999c8dfc..fc80ccc672 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -147,8 +147,16 @@ class DatasetEventCollectionSchema(Schema):
     total_entries = fields.Int()
 
 
+class CreateDatasetEventSchema(Schema):
+    """Create Dataset Event Schema."""
+
+    dataset_uri = fields.String()
+    extra = JsonObjectField()
+
+
 dataset_event_schema = DatasetEventSchema()
 dataset_event_collection_schema = DatasetEventCollectionSchema()
+create_dataset_event_schema = CreateDatasetEventSchema()
 
 
 class QueuedEvent(NamedTuple):
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 047f8494a4..e0c2db9e1f 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -56,8 +56,14 @@ class DatasetManager(LoggingMixin):
             self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri, 
extra=dataset_model.extra))
 
     def register_dataset_change(
-        self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, 
session: Session, **kwargs
-    ) -> None:
+        self,
+        *,
+        task_instance: TaskInstance | None = None,
+        dataset: Dataset,
+        extra=None,
+        session: Session,
+        **kwargs,
+    ) -> DatasetEvent | None:
         """
         Register dataset related changes.
 
@@ -71,17 +77,23 @@ class DatasetManager(LoggingMixin):
         )
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
-            return
-        session.add(
-            DatasetEvent(
-                dataset_id=dataset_model.id,
-                source_task_id=task_instance.task_id,
-                source_dag_id=task_instance.dag_id,
-                source_run_id=task_instance.run_id,
-                source_map_index=task_instance.map_index,
-                extra=extra,
+            return None
+
+        event_kwargs = {
+            "dataset_id": dataset_model.id,
+            "extra": extra,
+        }
+        if task_instance:
+            event_kwargs.update(
+                {
+                    "source_task_id": task_instance.task_id,
+                    "source_dag_id": task_instance.dag_id,
+                    "source_run_id": task_instance.run_id,
+                    "source_map_index": task_instance.map_index,
+                }
             )
-        )
+        dataset_event = DatasetEvent(**event_kwargs)
+        session.add(dataset_event)
         session.flush()
 
         self.notify_dataset_changed(dataset=dataset)
@@ -90,6 +102,7 @@ class DatasetManager(LoggingMixin):
         if dataset_model.consuming_dags:
             self._queue_dagruns(dataset_model, session)
         session.flush()
+        return dataset_event
 
     def notify_dataset_created(self, dataset: Dataset):
         """Run applicable notification actions when a dataset is created."""
diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py 
b/airflow/providers/fab/auth_manager/security_manager/override.py
index 02693a0a74..fe3dfa382a 100644
--- a/airflow/providers/fab/auth_manager/security_manager/override.py
+++ b/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -248,6 +248,7 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
         (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
         (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
     ]
     # [END security_user_perms]
 
@@ -275,6 +276,7 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_VARIABLE),
         (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_XCOM),
         (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DATASET),
+        (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
     ]
     # [END security_op_perms]
 
diff --git a/airflow/www/static/js/components/Table/Cells.tsx 
b/airflow/www/static/js/components/Table/Cells.tsx
index 346acea91e..9f75e0e484 100644
--- a/airflow/www/static/js/components/Table/Cells.tsx
+++ b/airflow/www/static/js/components/Table/Cells.tsx
@@ -150,6 +150,9 @@ export const TaskInstanceLink = ({ cell: { value, row } }: 
CellProps) => {
   const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
   const gridUrl = getMetaValue("grid_url");
   const dagId = getMetaValue("dag_id");
+  if (!value || !sourceRunId || !sourceDagId || !gridUrl) {
+    return null;
+  }
   const stringToReplace = dagId || "__DAG_ID__";
   const url = `${gridUrl?.replace(
     stringToReplace,
diff --git a/airflow/www/static/js/types/api-generated.ts 
b/airflow/www/static/js/types/api-generated.ts
index 9ed3dfa11e..c64bc2a9ee 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -684,31 +684,8 @@ export interface paths {
   "/datasets/events": {
     /** Get dataset events */
     get: operations["get_dataset_events"];
-    parameters: {
-      query: {
-        /** The numbers of items to return. */
-        limit?: components["parameters"]["PageLimit"];
-        /** The number of items to skip before starting to collect the result 
set. */
-        offset?: components["parameters"]["PageOffset"];
-        /**
-         * The name of the field to order the results by.
-         * Prefix a field name with `-` to reverse the sort order.
-         *
-         * *New in version 2.1.0*
-         */
-        order_by?: components["parameters"]["OrderBy"];
-        /** The Dataset ID that updated the dataset. */
-        dataset_id?: components["parameters"]["FilterDatasetID"];
-        /** The DAG ID that updated the dataset. */
-        source_dag_id?: components["parameters"]["FilterSourceDAGID"];
-        /** The task ID that updated the dataset. */
-        source_task_id?: components["parameters"]["FilterSourceTaskID"];
-        /** The DAG run ID that updated the dataset. */
-        source_run_id?: components["parameters"]["FilterSourceRunID"];
-        /** The map index that updated the dataset. */
-        source_map_index?: components["parameters"]["FilterSourceMapIndex"];
-      };
-    };
+    /** Create dataset event */
+    post: operations["create_dataset_event"];
   };
   "/config": {
     get: operations["get_config"];
@@ -1825,6 +1802,12 @@ export interface components {
       /** @description The dataset event creation time */
       timestamp?: string;
     };
+    CreateDatasetEvent: {
+      /** @description The URI of the dataset */
+      dataset_uri: string;
+      /** @description The dataset event extra */
+      extra?: { [key: string]: unknown } | null;
+    };
     QueuedEvent: {
       /** @description The datata uri. */
       uri?: string;
@@ -4598,6 +4581,26 @@ export interface operations {
       404: components["responses"]["NotFound"];
     };
   };
+  /** Create dataset event */
+  create_dataset_event: {
+    responses: {
+      /** Success. */
+      200: {
+        content: {
+          "application/json": components["schemas"]["DatasetEvent"];
+        };
+      };
+      400: components["responses"]["BadRequest"];
+      401: components["responses"]["Unauthenticated"];
+      403: components["responses"]["PermissionDenied"];
+      404: components["responses"]["NotFound"];
+    };
+    requestBody: {
+      content: {
+        "application/json": components["schemas"]["CreateDatasetEvent"];
+      };
+    };
+  };
   get_config: {
     parameters: {
       query: {
@@ -5167,6 +5170,9 @@ export type DatasetCollection = CamelCasedPropertiesDeep<
 export type DatasetEvent = CamelCasedPropertiesDeep<
   components["schemas"]["DatasetEvent"]
 >;
+export type CreateDatasetEvent = CamelCasedPropertiesDeep<
+  components["schemas"]["CreateDatasetEvent"]
+>;
 export type QueuedEvent = CamelCasedPropertiesDeep<
   components["schemas"]["QueuedEvent"]
 >;
@@ -5481,6 +5487,9 @@ export type GetDatasetVariables = 
CamelCasedPropertiesDeep<
 export type GetDatasetEventsVariables = CamelCasedPropertiesDeep<
   operations["get_dataset_events"]["parameters"]["query"]
 >;
+export type CreateDatasetEventVariables = CamelCasedPropertiesDeep<
+  
operations["create_dataset_event"]["requestBody"]["content"]["application/json"]
+>;
 export type GetConfigVariables = CamelCasedPropertiesDeep<
   operations["get_config"]["parameters"]["query"]
 >;
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py 
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 801ce65c89..15f37b2269 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import urllib
 from typing import Generator
+from unittest.mock import ANY
 
 import pytest
 import time_machine
@@ -53,6 +54,7 @@ def configured_app(minimal_app_for_api):
         role_name="Test",
         permissions=[
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+            (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
         ],
     )
     create_user(app, username="test_no_permissions", 
role_name="TestNoPermissions")  # type: ignore
@@ -577,6 +579,53 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
         }
 
 
+class TestPostDatasetEvents(TestDatasetEndpoint):
+    @pytest.fixture
+    def time_freezer(self) -> Generator:
+        freezer = time_machine.travel(self.default_time, tick=False)
+        freezer.start()
+
+        yield
+
+        freezer.stop()
+
+    @pytest.mark.usefixtures("time_freezer")
+    def test_should_respond_200(self, session):
+        self._create_dataset(session)
+        event_payload = {"dataset_uri": "s3://bucket/key", "extra": {"foo": 
"bar"}}
+        response = self.client.post(
+            "/api/v1/datasets/events", json=event_payload, 
environ_overrides={"REMOTE_USER": "test"}
+        )
+
+        assert response.status_code == 200
+        response_data = response.json
+        assert response_data == {
+            "id": ANY,
+            "created_dagruns": [],
+            "dataset_uri": event_payload["dataset_uri"],
+            "dataset_id": ANY,
+            "extra": {"foo": "bar", "from_rest_api": True},
+            "source_dag_id": None,
+            "source_task_id": None,
+            "source_run_id": None,
+            "source_map_index": -1,
+            "timestamp": self.default_time,
+        }
+
+    def test_order_by_raises_400_for_invalid_attr(self, session):
+        self._create_dataset(session)
+        event_invalid_payload = {"dataset_uri": "TEST_DATASET_URI", "extra": 
{"foo": "bar"}, "fake": {}}
+        response = self.client.post(
+            "/api/v1/datasets/events", json=event_invalid_payload, 
environ_overrides={"REMOTE_USER": "test"}
+        )
+        assert response.status_code == 400
+
+    def test_should_raises_401_unauthenticated(self, session):
+        self._create_dataset(session)
+        response = self.client.post("/api/v1/datasets/events", 
json={"dataset_uri": "TEST_DATASET_URI"})
+        assert_401(response)
+
+
 class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
     @pytest.mark.parametrize(
         "url, expected_event_runids",
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py 
b/tests/api_connexion/schemas/test_dataset_schema.py
index 9dd0d7d7f7..2a88dd9865 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -162,6 +162,37 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
         }
 
 
+class TestDatasetEventCreateSchema(TestDatasetSchemaBase):
+    def test_serialize(self, session):
+        d = DatasetModel("s3://abc")
+        session.add(d)
+        session.commit()
+        event = DatasetEvent(
+            id=1,
+            dataset_id=d.id,
+            extra={"foo": "bar"},
+            source_dag_id=None,
+            source_task_id=None,
+            source_run_id=None,
+            source_map_index=-1,
+        )
+        session.add(event)
+        session.flush()
+        serialized_data = dataset_event_schema.dump(event)
+        assert serialized_data == {
+            "id": 1,
+            "dataset_id": d.id,
+            "dataset_uri": "s3://abc",
+            "extra": {"foo": "bar"},
+            "source_dag_id": None,
+            "source_task_id": None,
+            "source_run_id": None,
+            "source_map_index": -1,
+            "timestamp": self.timestamp,
+            "created_dagruns": [],
+        }
+
+
 class TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
     def test_serialize(self, session):
         common = {

Reply via email to