Lee-W commented on code in PR #53189:
URL: https://github.com/apache/airflow/pull/53189#discussion_r2208036291


##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py:
##########
@@ -75,3 +106,7 @@ class HITLDetailCollection(BaseModel):
 
     hitl_details: list[HITLDetail]
     total_entries: int
+
+    # Shared link action request fields
+    response_content: list[str] | None = None

Review Comment:
   This has been updated to chosen_options, I think we'll need to update it



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py:
##########
@@ -63,6 +81,19 @@ class HITLDetail(BaseModel):
 
     response_received: bool = False
 
+    # Shared link fields
+    link_type: str = Field(
+        default="action",
+        description="Type of link to generate: 'action' for direct action or 
'redirect' for UI interaction",
+    )
+    action: str | None = Field(
+        default=None,
+        description="Optional action to perform when link is accessed (e.g., 
'approve', 'reject'). Required for action links.",
+    )
+    expires_in_hours: int | None = Field(default=None, description="Optional 
custom expiration time in hours")

Review Comment:
   Why do we need it if we already have `expires_at`



##########
airflow-core/src/airflow/utils/hitl_shared_links.py:
##########
@@ -0,0 +1,220 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Utilities for Human-in-the-Loop (HITL) shared links."""
+
+from __future__ import annotations
+
+import base64
+import hashlib
+import hmac
+import json
+from datetime import datetime, timedelta
+from typing import Any
+from urllib.parse import urlencode
+
+import structlog
+
+from airflow.configuration import conf
+from airflow.utils import timezone
+
+log = structlog.get_logger(__name__)
+
+
+class HITLSharedLinkManager:

Review Comment:
   Do we need to make this part of core. or should we move to standard provider?



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -272,3 +293,456 @@ def get_hitl_details(
         hitl_details=hitl_details,
         total_entries=total_entries,
     )
+
+
+@hitl_router.post(
+    "/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}",
+    status_code=status.HTTP_201_CREATED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def create_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    update_hitl_detail_payload: UpdateHITLDetailPayload,
+    user: GetUserDep,
+    session: SessionDep,
+) -> HITLDetailResponse:
+    """
+    Create a shared link for a Human-in-the-loop task.
+
+    This endpoint generates a secure, time-limited shared link that allows 
external users
+    to interact with HITL tasks without requiring full Airflow authentication. 
The link
+    can be configured for either direct action execution or UI redirection.
+
+    :param dag_id: The DAG identifier
+    :param dag_run_id: The DAG run identifier
+    :param task_id: The task identifier
+    :param update_hitl_detail_payload: Payload containing link configuration 
and initial response data
+    :param user: The authenticated user creating the shared link
+    :param session: Database session for data persistence
+
+    :raises HTTPException: 403 if HITL shared links are not enabled
+    :raises HTTPException: 404 if the task instance or HITL detail does not 
exist
+    :raises HTTPException: 400 if link generation fails due to invalid 
parameters
+
+    :return: HITLDetailResponse containing the generated link URL and metadata
+    """
+    if not hitl_shared_link_manager.is_enabled():
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            "HITL shared links are not enabled",
+        )
+
+    task_instance = _get_task_instance(
+        dag_id=dag_id,
+        dag_run_id=dag_run_id,
+        task_id=task_id,
+        session=session,
+        map_index=None,
+    )
+
+    ti_id_str = str(task_instance.id)
+    hitl_detail_model = 
session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == 
ti_id_str))
+    if not hitl_detail_model:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Human-in-the-loop detail does not exist for Task Instance with 
id {ti_id_str}",
+        )
+
+    try:
+        link_data = hitl_shared_link_manager.generate_link(
+            dag_id=dag_id,
+            dag_run_id=dag_run_id,
+            task_id=task_id,
+            map_index=None,
+            link_type=update_hitl_detail_payload.link_type,
+            action=update_hitl_detail_payload.action,
+            expires_in_hours=update_hitl_detail_payload.expires_in_hours,
+        )
+
+        response = HITLDetailResponse(
+            user_id=user.get_id(),
+            response_at=timezone.utcnow(),
+            chosen_options=update_hitl_detail_payload.chosen_options,
+            params_input=update_hitl_detail_payload.params_input,
+            task_instance_id=link_data["task_instance_id"],
+            link_url=link_data["link_url"],
+            expires_at=link_data["expires_at"],
+            action=link_data["action"],
+            link_type=link_data["link_type"],
+        )
+
+        return response
+
+    except ValueError as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            str(e),
+        )
+
+
+@hitl_router.post(
+    
"/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/{map_index}",
+    status_code=status.HTTP_201_CREATED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def create_mapped_ti_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    map_index: int,
+    update_hitl_detail_payload: UpdateHITLDetailPayload,
+    user: GetUserDep,
+    session: SessionDep,
+) -> HITLDetailResponse:
+    """
+    Create a shared link for a mapped Human-in-the-loop task.
+
+    This endpoint generates a secure, time-limited shared link for mapped task 
instances,
+    allowing external users to interact with specific mapped HITL tasks 
without requiring
+    full Airflow authentication. The link can be configured for either direct 
action
+    execution or UI redirection.
+
+    :param dag_id: The DAG identifier
+    :param dag_run_id: The DAG run identifier
+    :param task_id: The task identifier
+    :param map_index: The map index for the mapped task instance
+    :param update_hitl_detail_payload: Payload containing link configuration 
and initial response data
+    :param user: The authenticated user creating the shared link
+    :param session: Database session for data persistence
+    """
+    if not hitl_shared_link_manager.is_enabled():
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            "HITL shared links are not enabled",
+        )
+
+    task_instance = _get_task_instance(
+        dag_id=dag_id,
+        dag_run_id=dag_run_id,
+        task_id=task_id,
+        session=session,
+        map_index=map_index,
+    )
+
+    ti_id_str = str(task_instance.id)
+    hitl_detail_model = 
session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == 
ti_id_str))
+    if not hitl_detail_model:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Human-in-the-loop detail does not exist for Task Instance with 
id {ti_id_str}",
+        )
+
+    try:
+        link_data = hitl_shared_link_manager.generate_link(
+            dag_id=dag_id,
+            dag_run_id=dag_run_id,
+            task_id=task_id,
+            map_index=map_index,
+            link_type=update_hitl_detail_payload.link_type,
+            action=update_hitl_detail_payload.action,
+            expires_in_hours=update_hitl_detail_payload.expires_in_hours,
+        )
+
+        response = HITLDetailResponse(
+            user_id=user.get_id(),
+            response_at=timezone.utcnow(),
+            chosen_options=update_hitl_detail_payload.chosen_options,
+            params_input=update_hitl_detail_payload.params_input,
+            task_instance_id=link_data["task_instance_id"],
+            link_url=link_data["link_url"],
+            expires_at=link_data["expires_at"],
+            action=link_data["action"],
+            link_type=link_data["link_type"],
+        )
+
+        return response
+
+    except ValueError as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            str(e),
+        )
+
+
+@hitl_router.get(
+    "/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}",
+    status_code=status.HTTP_200_OK,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+)
+def get_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    payload: str,
+    signature: str,
+    session: SessionDep,
+) -> HITLDetail:
+    """
+    Get HITL details via shared link (for redirect links).
+
+    This endpoint allows external users to access HITL task details through a 
secure
+    shared link. The link must be a redirect-type link, which provides 
read-only access
+    to the HITL task information for UI rendering or decision-making purposes.
+
+    :param dag_id: The DAG identifier (from URL path)
+    :param dag_run_id: The DAG run identifier (from URL path)
+    :param task_id: The task identifier (from URL path)
+    :param payload: Base64-encoded payload containing link metadata and 
expiration
+    :param signature: HMAC signature for payload verification
+    :param session: Database session for data retrieval
+    """
+    if not hitl_shared_link_manager.is_enabled():
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            "HITL shared links are not enabled",
+        )
+
+    try:
+        link_data = hitl_shared_link_manager.verify_link(payload, signature)
+
+        if link_data.get("link_type") != "redirect":
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                "This link is not a redirect link",
+            )
+
+        return _get_hitl_detail(
+            dag_id=link_data["dag_id"],
+            dag_run_id=link_data["dag_run_id"],
+            task_id=link_data["task_id"],
+            session=session,
+            map_index=link_data.get("map_index"),
+        )
+
+    except ValueError as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            str(e),
+        )
+
+
+@hitl_router.get(
+    
"/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/{map_index}",
+    status_code=status.HTTP_200_OK,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+)
+def get_mapped_ti_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    map_index: int,
+    payload: str,
+    signature: str,
+    session: SessionDep,
+) -> HITLDetail:
+    """
+    Get mapped HITL details via shared link (for redirect links).
+
+    This endpoint allows external users to access mapped HITL task details 
through a secure
+    shared link. The link must be a redirect-type link, which provides 
read-only access
+    to the mapped HITL task information for UI rendering or decision-making 
purposes.
+
+    :param dag_id: The DAG identifier (from URL path)
+    :param dag_run_id: The DAG run identifier (from URL path)
+    :param task_id: The task identifier (from URL path)
+    :param map_index: The map index for the mapped task instance (from URL 
path)
+    :param payload: Base64-encoded payload containing link metadata and 
expiration
+    :param signature: HMAC signature for payload verification
+    :param session: Database session for data retrieval
+    """
+    if not hitl_shared_link_manager.is_enabled():

Review Comment:
   similiar logic as the one above



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -47,6 +48,15 @@ def _get_task_instance(
     session: SessionDep,
     map_index: int | None = None,
 ) -> TI:
+    """
+    Get a task instance by its identifiers.
+
+    :param dag_id: DAG ID

Review Comment:
   ```suggestion
       :param dag_id: dag ID
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -272,3 +293,456 @@ def get_hitl_details(
         hitl_details=hitl_details,
         total_entries=total_entries,
     )
+
+
+@hitl_router.post(
+    "/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}",
+    status_code=status.HTTP_201_CREATED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def create_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    update_hitl_detail_payload: UpdateHITLDetailPayload,
+    user: GetUserDep,
+    session: SessionDep,
+) -> HITLDetailResponse:
+    """
+    Create a shared link for a Human-in-the-loop task.
+
+    This endpoint generates a secure, time-limited shared link that allows 
external users
+    to interact with HITL tasks without requiring full Airflow authentication. 
The link
+    can be configured for either direct action execution or UI redirection.
+
+    :param dag_id: The DAG identifier
+    :param dag_run_id: The DAG run identifier
+    :param task_id: The task identifier
+    :param update_hitl_detail_payload: Payload containing link configuration 
and initial response data
+    :param user: The authenticated user creating the shared link
+    :param session: Database session for data persistence
+
+    :raises HTTPException: 403 if HITL shared links are not enabled
+    :raises HTTPException: 404 if the task instance or HITL detail does not 
exist
+    :raises HTTPException: 400 if link generation fails due to invalid 
parameters
+
+    :return: HITLDetailResponse containing the generated link URL and metadata
+    """
+    if not hitl_shared_link_manager.is_enabled():
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            "HITL shared links are not enabled",
+        )
+
+    task_instance = _get_task_instance(
+        dag_id=dag_id,
+        dag_run_id=dag_run_id,
+        task_id=task_id,
+        session=session,
+        map_index=None,
+    )
+
+    ti_id_str = str(task_instance.id)
+    hitl_detail_model = 
session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == 
ti_id_str))
+    if not hitl_detail_model:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            f"Human-in-the-loop detail does not exist for Task Instance with 
id {ti_id_str}",
+        )
+
+    try:
+        link_data = hitl_shared_link_manager.generate_link(
+            dag_id=dag_id,
+            dag_run_id=dag_run_id,
+            task_id=task_id,
+            map_index=None,
+            link_type=update_hitl_detail_payload.link_type,
+            action=update_hitl_detail_payload.action,
+            expires_in_hours=update_hitl_detail_payload.expires_in_hours,
+        )
+
+        response = HITLDetailResponse(
+            user_id=user.get_id(),
+            response_at=timezone.utcnow(),
+            chosen_options=update_hitl_detail_payload.chosen_options,
+            params_input=update_hitl_detail_payload.params_input,
+            task_instance_id=link_data["task_instance_id"],
+            link_url=link_data["link_url"],
+            expires_at=link_data["expires_at"],
+            action=link_data["action"],
+            link_type=link_data["link_type"],
+        )
+
+        return response
+
+    except ValueError as e:
+        raise HTTPException(
+            status.HTTP_400_BAD_REQUEST,
+            str(e),
+        )
+
+
+@hitl_router.post(
+    
"/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}/{map_index}",
+    status_code=status.HTTP_201_CREATED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def create_mapped_ti_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    map_index: int,
+    update_hitl_detail_payload: UpdateHITLDetailPayload,
+    user: GetUserDep,
+    session: SessionDep,
+) -> HITLDetailResponse:
+    """
+    Create a shared link for a mapped Human-in-the-loop task.
+
+    This endpoint generates a secure, time-limited shared link for mapped task 
instances,
+    allowing external users to interact with specific mapped HITL tasks 
without requiring
+    full Airflow authentication. The link can be configured for either direct 
action
+    execution or UI redirection.
+
+    :param dag_id: The DAG identifier
+    :param dag_run_id: The DAG run identifier
+    :param task_id: The task identifier
+    :param map_index: The map index for the mapped task instance
+    :param update_hitl_detail_payload: Payload containing link configuration 
and initial response data
+    :param user: The authenticated user creating the shared link
+    :param session: Database session for data persistence
+    """
+    if not hitl_shared_link_manager.is_enabled():

Review Comment:
   The logic looks similar to the one above. we probably can refactor it



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -260,7 +281,7 @@ def get_hitl_details(
     readable_ti_filter: ReadableTIFilterDep,
     session: SessionDep,
 ) -> HITLDetailCollection:
-    """Get Human-in-the-loop details."""
+    """Get all Human-in-the-loop details."""

Review Comment:
   We're going to support the filter. so the original docstring might be more 
accurate



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -75,10 +85,21 @@ def _update_hitl_detail(
     dag_run_id: str,
     task_id: str,
     update_hitl_detail_payload: UpdateHITLDetailPayload,
-    user: GetUserDep,
+    user: GetUserDep | None,
     session: SessionDep,
     map_index: int | None = None,
 ) -> HITLDetailResponse:
+    """
+    Update a Human-in-the-loop detail.
+
+    :param dag_id: DAG ID

Review Comment:
   please replace all `DAG` with `Dag`. Thanks!



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -47,6 +48,15 @@ def _get_task_instance(
     session: SessionDep,
     map_index: int | None = None,
 ) -> TI:
+    """
+    Get a task instance by its identifiers.
+
+    :param dag_id: DAG ID
+    :param dag_run_id: DAG run ID

Review Comment:
   ```suggestion
       :param dag_run_id: dag run ID
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -75,10 +85,21 @@ def _update_hitl_detail(
     dag_run_id: str,
     task_id: str,
     update_hitl_detail_payload: UpdateHITLDetailPayload,
-    user: GetUserDep,
+    user: GetUserDep | None,

Review Comment:
   when will it be none?



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py:
##########
@@ -272,3 +293,456 @@ def get_hitl_details(
         hitl_details=hitl_details,
         total_entries=total_entries,
     )
+
+
+@hitl_router.post(
+    "/api/v2/hitl-details-share-link/{dag_id}/{dag_run_id}/{task_id}",
+    status_code=status.HTTP_201_CREATED,
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_400_BAD_REQUEST,
+            status.HTTP_403_FORBIDDEN,
+        ]
+    ),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def create_hitl_share_link(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    update_hitl_detail_payload: UpdateHITLDetailPayload,
+    user: GetUserDep,
+    session: SessionDep,
+) -> HITLDetailResponse:
+    """
+    Create a shared link for a Human-in-the-loop task.
+
+    This endpoint generates a secure, time-limited shared link that allows 
external users
+    to interact with HITL tasks without requiring full Airflow authentication. 
The link
+    can be configured for either direct action execution or UI redirection.
+
+    :param dag_id: The DAG identifier
+    :param dag_run_id: The DAG run identifier
+    :param task_id: The task identifier
+    :param update_hitl_detail_payload: Payload containing link configuration 
and initial response data
+    :param user: The authenticated user creating the shared link
+    :param session: Database session for data persistence
+
+    :raises HTTPException: 403 if HITL shared links are not enabled
+    :raises HTTPException: 404 if the task instance or HITL detail does not 
exist
+    :raises HTTPException: 400 if link generation fails due to invalid 
parameters
+
+    :return: HITLDetailResponse containing the generated link URL and metadata
+    """
+    if not hitl_shared_link_manager.is_enabled():
+        raise HTTPException(
+            status.HTTP_403_FORBIDDEN,
+            "HITL shared links are not enabled",
+        )

Review Comment:
   sounds good!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to