This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9b17ff3aa3 Add post endpoint for dataset events (#37570)
9b17ff3aa3 is described below
commit 9b17ff3aa309ba07ef94238ad3465f074df1840a
Author: Eduardo Nicastro <[email protected]>
AuthorDate: Mon Feb 26 04:01:52 2024 -0300
Add post endpoint for dataset events (#37570)
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../api_connexion/endpoints/dataset_endpoint.py | 49 +++++++++++++++++-
airflow/api_connexion/openapi/v1.yaml | 59 ++++++++++++++++++----
airflow/api_connexion/schemas/dataset_schema.py | 8 +++
airflow/datasets/manager.py | 37 +++++++++-----
.../fab/auth_manager/security_manager/override.py | 2 +
airflow/www/static/js/components/Table/Cells.tsx | 3 ++
airflow/www/static/js/types/api-generated.ts | 59 +++++++++++++---------
.../endpoints/test_dataset_endpoint.py | 49 ++++++++++++++++++
tests/api_connexion/schemas/test_dataset_schema.py | 31 ++++++++++++
9 files changed, 250 insertions(+), 47 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py
b/airflow/api_connexion/endpoints/dataset_endpoint.py
index bf83c857d7..91c41264a5 100644
--- a/airflow/api_connexion/endpoints/dataset_endpoint.py
+++ b/airflow/api_connexion/endpoints/dataset_endpoint.py
@@ -20,11 +20,13 @@ from http import HTTPStatus
from typing import TYPE_CHECKING
from connexion import NoContent
+from marshmallow import ValidationError
from sqlalchemy import delete, func, select
from sqlalchemy.orm import joinedload, subqueryload
from airflow.api_connexion import security
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.endpoints.request_dict import get_json_request_dict
+from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit,
format_datetime, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DagScheduleDatasetReference,
@@ -33,15 +35,23 @@ from airflow.api_connexion.schemas.dataset_schema import (
QueuedEvent,
QueuedEventCollection,
TaskOutletDatasetReference,
+ create_dataset_event_schema,
dataset_collection_schema,
dataset_event_collection_schema,
+ dataset_event_schema,
dataset_schema,
queued_event_collection_schema,
queued_event_schema,
)
+from airflow.datasets import Dataset
+from airflow.datasets.manager import dataset_manager
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent,
DatasetModel
+from airflow.security import permissions
+from airflow.utils import timezone
from airflow.utils.db import get_query_count
+from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.www.decorators import action_logging
from airflow.www.extensions.init_auth_manager import get_auth_manager
if TYPE_CHECKING:
@@ -49,6 +59,8 @@ if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse
+RESOURCE_EVENT_PREFIX = "dataset"
+
@security.requires_access_dataset("GET")
@provide_session
@@ -311,3 +323,38 @@ def delete_dataset_queued_events(
"Queue event not found",
detail=f"Queue event with dataset uri: `{uri}` was not found",
)
+
+
[email protected]_access_dataset("POST")
+@provide_session
+@action_logging(
+ event=action_event_from_permission(
+ prefix=RESOURCE_EVENT_PREFIX,
+ permission=permissions.ACTION_CAN_CREATE,
+ ),
+)
+def create_dataset_event(session: Session = NEW_SESSION) -> APIResponse:
+ """Create dataset event."""
+ body = get_json_request_dict()
+ try:
+ json_body = create_dataset_event_schema.load(body)
+ except ValidationError as err:
+ raise BadRequest(detail=str(err))
+
+ uri = json_body["dataset_uri"]
+ dataset = session.scalar(select(DatasetModel).where(DatasetModel.uri ==
uri).limit(1))
+ if not dataset:
+ raise NotFound(title="Dataset not found", detail=f"Dataset with uri:
'{uri}' not found")
+ timestamp = timezone.utcnow()
+ extra = json_body.get("extra", {})
+ extra["from_rest_api"] = True
+ dataset_event = dataset_manager.register_dataset_change(
+ dataset=Dataset(uri),
+ timestamp=timestamp,
+ extra=extra,
+ session=session,
+ )
+ if not dataset_event:
+ raise NotFound(title="Dataset not found", detail=f"Dataset with uri:
'{uri}' not found")
+ event = dataset_event_schema.dump(dataset_event)
+ return event
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index d2ccf87d07..db79d9b187 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -2129,21 +2129,21 @@ paths:
$ref: "#/components/responses/NotFound"
/datasets/events:
- parameters:
- - $ref: "#/components/parameters/PageLimit"
- - $ref: "#/components/parameters/PageOffset"
- - $ref: "#/components/parameters/OrderBy"
- - $ref: "#/components/parameters/FilterDatasetID"
- - $ref: "#/components/parameters/FilterSourceDAGID"
- - $ref: "#/components/parameters/FilterSourceTaskID"
- - $ref: "#/components/parameters/FilterSourceRunID"
- - $ref: "#/components/parameters/FilterSourceMapIndex"
get:
summary: Get dataset events
description: Get dataset events
x-openapi-router-controller:
airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset_events
tags: [Dataset]
+ parameters:
+ - $ref: "#/components/parameters/PageLimit"
+ - $ref: "#/components/parameters/PageOffset"
+ - $ref: "#/components/parameters/OrderBy"
+ - $ref: "#/components/parameters/FilterDatasetID"
+ - $ref: "#/components/parameters/FilterSourceDAGID"
+ - $ref: "#/components/parameters/FilterSourceTaskID"
+ - $ref: "#/components/parameters/FilterSourceRunID"
+ - $ref: "#/components/parameters/FilterSourceMapIndex"
responses:
"200":
description: Success.
@@ -2157,6 +2157,33 @@ paths:
$ref: "#/components/responses/PermissionDenied"
"404":
$ref: "#/components/responses/NotFound"
+ post:
+ summary: Create dataset event
+ description: Create dataset event
+ x-openapi-router-controller:
airflow.api_connexion.endpoints.dataset_endpoint
+ operationId: create_dataset_event
+ tags: [Dataset]
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/CreateDatasetEvent'
+ responses:
+ '200':
+ description: Success.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DatasetEvent'
+ "400":
+ $ref: "#/components/responses/BadRequest"
+ '401':
+ $ref: '#/components/responses/Unauthenticated'
+ '403':
+ $ref: '#/components/responses/PermissionDenied'
+ '404':
+ $ref: '#/components/responses/NotFound'
/config:
get:
@@ -4290,6 +4317,20 @@ components:
description: The dataset event creation time
nullable: false
+ CreateDatasetEvent:
+ type: object
+ required:
+ - dataset_uri
+ properties:
+ dataset_uri:
+ type: string
+ description: The URI of the dataset
+ nullable: false
+ extra:
+ type: object
+ description: The dataset event extra
+ nullable: true
+
QueuedEvent:
type: object
properties:
diff --git a/airflow/api_connexion/schemas/dataset_schema.py
b/airflow/api_connexion/schemas/dataset_schema.py
index a7999c8dfc..fc80ccc672 100644
--- a/airflow/api_connexion/schemas/dataset_schema.py
+++ b/airflow/api_connexion/schemas/dataset_schema.py
@@ -147,8 +147,16 @@ class DatasetEventCollectionSchema(Schema):
total_entries = fields.Int()
+class CreateDatasetEventSchema(Schema):
+ """Create Dataset Event Schema."""
+
+ dataset_uri = fields.String()
+ extra = JsonObjectField()
+
+
dataset_event_schema = DatasetEventSchema()
dataset_event_collection_schema = DatasetEventCollectionSchema()
+create_dataset_event_schema = CreateDatasetEventSchema()
class QueuedEvent(NamedTuple):
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 047f8494a4..e0c2db9e1f 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -56,8 +56,14 @@ class DatasetManager(LoggingMixin):
self.notify_dataset_created(dataset=Dataset(uri=dataset_model.uri,
extra=dataset_model.extra))
def register_dataset_change(
- self, *, task_instance: TaskInstance, dataset: Dataset, extra=None,
session: Session, **kwargs
- ) -> None:
+ self,
+ *,
+ task_instance: TaskInstance | None = None,
+ dataset: Dataset,
+ extra=None,
+ session: Session,
+ **kwargs,
+ ) -> DatasetEvent | None:
"""
Register dataset related changes.
@@ -71,17 +77,23 @@ class DatasetManager(LoggingMixin):
)
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
- return
- session.add(
- DatasetEvent(
- dataset_id=dataset_model.id,
- source_task_id=task_instance.task_id,
- source_dag_id=task_instance.dag_id,
- source_run_id=task_instance.run_id,
- source_map_index=task_instance.map_index,
- extra=extra,
+ return None
+
+ event_kwargs = {
+ "dataset_id": dataset_model.id,
+ "extra": extra,
+ }
+ if task_instance:
+ event_kwargs.update(
+ {
+ "source_task_id": task_instance.task_id,
+ "source_dag_id": task_instance.dag_id,
+ "source_run_id": task_instance.run_id,
+ "source_map_index": task_instance.map_index,
+ }
)
- )
+ dataset_event = DatasetEvent(**event_kwargs)
+ session.add(dataset_event)
session.flush()
self.notify_dataset_changed(dataset=dataset)
@@ -90,6 +102,7 @@ class DatasetManager(LoggingMixin):
if dataset_model.consuming_dags:
self._queue_dagruns(dataset_model, session)
session.flush()
+ return dataset_event
def notify_dataset_created(self, dataset: Dataset):
"""Run applicable notification actions when a dataset is created."""
diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py
b/airflow/providers/fab/auth_manager/security_manager/override.py
index 02693a0a74..fe3dfa382a 100644
--- a/airflow/providers/fab/auth_manager/security_manager/override.py
+++ b/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -248,6 +248,7 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_user_perms]
@@ -275,6 +276,7 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_VARIABLE),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_XCOM),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DATASET),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
]
# [END security_op_perms]
diff --git a/airflow/www/static/js/components/Table/Cells.tsx
b/airflow/www/static/js/components/Table/Cells.tsx
index 346acea91e..9f75e0e484 100644
--- a/airflow/www/static/js/components/Table/Cells.tsx
+++ b/airflow/www/static/js/components/Table/Cells.tsx
@@ -150,6 +150,9 @@ export const TaskInstanceLink = ({ cell: { value, row } }:
CellProps) => {
const { sourceRunId, sourceDagId, sourceMapIndex } = row.original;
const gridUrl = getMetaValue("grid_url");
const dagId = getMetaValue("dag_id");
+ if (!value || !sourceRunId || !sourceDagId || !gridUrl) {
+ return null;
+ }
const stringToReplace = dagId || "__DAG_ID__";
const url = `${gridUrl?.replace(
stringToReplace,
diff --git a/airflow/www/static/js/types/api-generated.ts
b/airflow/www/static/js/types/api-generated.ts
index 9ed3dfa11e..c64bc2a9ee 100644
--- a/airflow/www/static/js/types/api-generated.ts
+++ b/airflow/www/static/js/types/api-generated.ts
@@ -684,31 +684,8 @@ export interface paths {
"/datasets/events": {
/** Get dataset events */
get: operations["get_dataset_events"];
- parameters: {
- query: {
- /** The numbers of items to return. */
- limit?: components["parameters"]["PageLimit"];
- /** The number of items to skip before starting to collect the result
set. */
- offset?: components["parameters"]["PageOffset"];
- /**
- * The name of the field to order the results by.
- * Prefix a field name with `-` to reverse the sort order.
- *
- * *New in version 2.1.0*
- */
- order_by?: components["parameters"]["OrderBy"];
- /** The Dataset ID that updated the dataset. */
- dataset_id?: components["parameters"]["FilterDatasetID"];
- /** The DAG ID that updated the dataset. */
- source_dag_id?: components["parameters"]["FilterSourceDAGID"];
- /** The task ID that updated the dataset. */
- source_task_id?: components["parameters"]["FilterSourceTaskID"];
- /** The DAG run ID that updated the dataset. */
- source_run_id?: components["parameters"]["FilterSourceRunID"];
- /** The map index that updated the dataset. */
- source_map_index?: components["parameters"]["FilterSourceMapIndex"];
- };
- };
+ /** Create dataset event */
+ post: operations["create_dataset_event"];
};
"/config": {
get: operations["get_config"];
@@ -1825,6 +1802,12 @@ export interface components {
/** @description The dataset event creation time */
timestamp?: string;
};
+ CreateDatasetEvent: {
+ /** @description The URI of the dataset */
+ dataset_uri: string;
+ /** @description The dataset event extra */
+ extra?: { [key: string]: unknown } | null;
+ };
QueuedEvent: {
/** @description The datata uri. */
uri?: string;
@@ -4598,6 +4581,26 @@ export interface operations {
404: components["responses"]["NotFound"];
};
};
+ /** Create dataset event */
+ create_dataset_event: {
+ responses: {
+ /** Success. */
+ 200: {
+ content: {
+ "application/json": components["schemas"]["DatasetEvent"];
+ };
+ };
+ 400: components["responses"]["BadRequest"];
+ 401: components["responses"]["Unauthenticated"];
+ 403: components["responses"]["PermissionDenied"];
+ 404: components["responses"]["NotFound"];
+ };
+ requestBody: {
+ content: {
+ "application/json": components["schemas"]["CreateDatasetEvent"];
+ };
+ };
+ };
get_config: {
parameters: {
query: {
@@ -5167,6 +5170,9 @@ export type DatasetCollection = CamelCasedPropertiesDeep<
export type DatasetEvent = CamelCasedPropertiesDeep<
components["schemas"]["DatasetEvent"]
>;
+export type CreateDatasetEvent = CamelCasedPropertiesDeep<
+ components["schemas"]["CreateDatasetEvent"]
+>;
export type QueuedEvent = CamelCasedPropertiesDeep<
components["schemas"]["QueuedEvent"]
>;
@@ -5481,6 +5487,9 @@ export type GetDatasetVariables =
CamelCasedPropertiesDeep<
export type GetDatasetEventsVariables = CamelCasedPropertiesDeep<
operations["get_dataset_events"]["parameters"]["query"]
>;
+export type CreateDatasetEventVariables = CamelCasedPropertiesDeep<
+
operations["create_dataset_event"]["requestBody"]["content"]["application/json"]
+>;
export type GetConfigVariables = CamelCasedPropertiesDeep<
operations["get_config"]["parameters"]["query"]
>;
diff --git a/tests/api_connexion/endpoints/test_dataset_endpoint.py
b/tests/api_connexion/endpoints/test_dataset_endpoint.py
index 801ce65c89..15f37b2269 100644
--- a/tests/api_connexion/endpoints/test_dataset_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dataset_endpoint.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import urllib
from typing import Generator
+from unittest.mock import ANY
import pytest
import time_machine
@@ -53,6 +54,7 @@ def configured_app(minimal_app_for_api):
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DATASET),
],
)
create_user(app, username="test_no_permissions",
role_name="TestNoPermissions") # type: ignore
@@ -577,6 +579,53 @@ class TestGetDatasetEvents(TestDatasetEndpoint):
}
+class TestPostDatasetEvents(TestDatasetEndpoint):
+ @pytest.fixture
+ def time_freezer(self) -> Generator:
+ freezer = time_machine.travel(self.default_time, tick=False)
+ freezer.start()
+
+ yield
+
+ freezer.stop()
+
+ @pytest.mark.usefixtures("time_freezer")
+ def test_should_respond_200(self, session):
+ self._create_dataset(session)
+ event_payload = {"dataset_uri": "s3://bucket/key", "extra": {"foo":
"bar"}}
+ response = self.client.post(
+ "/api/v1/datasets/events", json=event_payload,
environ_overrides={"REMOTE_USER": "test"}
+ )
+
+ assert response.status_code == 200
+ response_data = response.json
+ assert response_data == {
+ "id": ANY,
+ "created_dagruns": [],
+ "dataset_uri": event_payload["dataset_uri"],
+ "dataset_id": ANY,
+ "extra": {"foo": "bar", "from_rest_api": True},
+ "source_dag_id": None,
+ "source_task_id": None,
+ "source_run_id": None,
+ "source_map_index": -1,
+ "timestamp": self.default_time,
+ }
+
+ def test_order_by_raises_400_for_invalid_attr(self, session):
+ self._create_dataset(session)
+ event_invalid_payload = {"dataset_uri": "TEST_DATASET_URI", "extra":
{"foo": "bar"}, "fake": {}}
+ response = self.client.post(
+ "/api/v1/datasets/events", json=event_invalid_payload,
environ_overrides={"REMOTE_USER": "test"}
+ )
+ assert response.status_code == 400
+
+ def test_should_raises_401_unauthenticated(self, session):
+ self._create_dataset(session)
+ response = self.client.post("/api/v1/datasets/events",
json={"dataset_uri": "TEST_DATASET_URI"})
+ assert_401(response)
+
+
class TestGetDatasetEventsEndpointPagination(TestDatasetEndpoint):
@pytest.mark.parametrize(
"url, expected_event_runids",
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py
b/tests/api_connexion/schemas/test_dataset_schema.py
index 9dd0d7d7f7..2a88dd9865 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -162,6 +162,37 @@ class TestDatasetEventSchema(TestDatasetSchemaBase):
}
+class TestDatasetEventCreateSchema(TestDatasetSchemaBase):
+ def test_serialize(self, session):
+ d = DatasetModel("s3://abc")
+ session.add(d)
+ session.commit()
+ event = DatasetEvent(
+ id=1,
+ dataset_id=d.id,
+ extra={"foo": "bar"},
+ source_dag_id=None,
+ source_task_id=None,
+ source_run_id=None,
+ source_map_index=-1,
+ )
+ session.add(event)
+ session.flush()
+ serialized_data = dataset_event_schema.dump(event)
+ assert serialized_data == {
+ "id": 1,
+ "dataset_id": d.id,
+ "dataset_uri": "s3://abc",
+ "extra": {"foo": "bar"},
+ "source_dag_id": None,
+ "source_task_id": None,
+ "source_run_id": None,
+ "source_map_index": -1,
+ "timestamp": self.timestamp,
+ "created_dagruns": [],
+ }
+
+
class TestDatasetEventCollectionSchema(TestDatasetSchemaBase):
def test_serialize(self, session):
common = {