This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 786ceec71fb feat(hitl): add utility functions for generating the url 
to required actions page  (#54827)
786ceec71fb is described below

commit 786ceec71fb6f61cd45439349115fca99ffe17ee
Author: Wei Lee <[email protected]>
AuthorDate: Wed Aug 27 16:41:16 2025 +0800

    feat(hitl): add utility functions for generating the url to required 
actions page  (#54827)
---
 airflow-core/docs/tutorial/hitl.rst                |  15 +-
 .../standard/example_dags/example_hitl_operator.py |   5 +
 .../airflow/providers/standard/operators/hitl.py   | 116 ++++++++++++++-
 .../tests/unit/standard/operators/test_hitl.py     | 163 ++++++++++++++++++++-
 4 files changed, 289 insertions(+), 10 deletions(-)

diff --git a/airflow-core/docs/tutorial/hitl.rst 
b/airflow-core/docs/tutorial/hitl.rst
index 70de9df757b..b104bc7d033 100644
--- a/airflow-core/docs/tutorial/hitl.rst
+++ b/airflow-core/docs/tutorial/hitl.rst
@@ -149,9 +149,18 @@ After the branch is chosen, the workflow will proceed 
along the selected path.
 Notifiers
 ---------
 
-A notifier is a callback mechanism that allows you to handle HITL events, such 
as when a task is waiting for human input, succeeds, or fails.
-The example uses a notifier ``LocalLogNotifier`` that logs messages for 
demonstration.
-You can implement your own notifier for different functionalities.
+A notifier is a callback mechanism for handling HITL events, such as when a 
task is waiting for human input, succeeds, or fails.
+The example uses the ``LocalLogNotifier``, which logs messages for 
demonstration purposes.
+
+The method ``HITLOperator.generate_link_to_ui_from_context`` can be used to 
generate a direct link to the UI page where the user should respond. It accepts 
four arguments:
+
+- ``context`` – automatically passed to ``notify`` by the notifier
+- ``base_url`` – (optional) the base URL of the Airflow UI; if not provided, 
``api.base_url`` in the configuration will be used
+- ``options`` – (optional) pre-selected options for the UI page
+- ``params_inputs`` – (optional) pre-loaded inputs for the UI page
+
+This makes it easy to include actionable links in notifications or logs.
+You can also implement your own notifier to provide different functionalities.
 For more details, please refer to `Creating a notifier 
<https://airflow.apache.org/docs/apache-airflow/stable/howto/notifications.html>`_
 and `Notifications 
<https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html>`_.
 
 In the example Dag, the notifier is defined as follows:
diff --git 
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
 
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index ca3ca4dc1ee..dd7b4dc277f 100644
--- 
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++ 
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -47,7 +47,12 @@ class LocalLogNotifier(BaseNotifier):
         self.message = message
 
     def notify(self, context: Context) -> None:
+        url = HITLOperator.generate_link_to_ui_from_context(
+            context=context,
+            base_url="http://localhost:28080";,
+        )
         self.log.info(self.message)
+        self.log.info("Url to respond %s", url)
 
 
 hitl_request_callback = LocalLogNotifier(
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/hitl.py 
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index 207771b4f40..bad99f18ee7 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -20,26 +20,28 @@ import logging
 
 from airflow.exceptions import AirflowOptionalProviderFeatureException
 from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS
-from airflow.sdk.bases.notifier import BaseNotifier
 
 if not AIRFLOW_V_3_1_PLUS:
     raise AirflowOptionalProviderFeatureException("Human in the loop 
functionality needs Airflow 3.1+.")
 
-
 from collections.abc import Collection, Mapping, Sequence
 from typing import TYPE_CHECKING, Any
+from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
 
+from airflow.configuration import conf
 from airflow.providers.standard.exceptions import HITLTimeoutError, 
HITLTriggerEventError
 from airflow.providers.standard.operators.branch import BranchMixIn
 from airflow.providers.standard.triggers.hitl import HITLTrigger, 
HITLTriggerEventSuccessPayload
 from airflow.providers.standard.utils.skipmixin import SkipMixin
 from airflow.providers.standard.version_compat import BaseOperator
+from airflow.sdk.bases.notifier import BaseNotifier
 from airflow.sdk.definitions.param import ParamsDict
 from airflow.sdk.execution_time.hitl import upsert_hitl_detail
 from airflow.sdk.timezone import utcnow
 
 if TYPE_CHECKING:
     from airflow.sdk.definitions.context import Context
+    from airflow.sdk.types import RuntimeTaskInstanceProtocol
 
 
 class HITLOperator(BaseOperator):
@@ -87,12 +89,33 @@ class HITLOperator(BaseOperator):
         self.respondents = [respondents] if isinstance(respondents, str) else 
respondents
 
         self.validate_options()
+        self.validate_params()
         self.validate_defaults()
 
     def validate_options(self) -> None:
+        """
+        Validate the `options` attribute of the instance.
+
+        Raises:
+            ValueError: If `options` is empty.
+            ValueError: If any option contains a comma (`,`), which is not 
allowed.
+        """
         if not self.options:
             raise ValueError('"options" cannot be empty.')
 
+        if any("," in option for option in self.options):
+            raise ValueError('"," is not allowed in option')
+
+    def validate_params(self) -> None:
+        """
+        Validate the `params` attribute of the instance.
+
+        Raises:
+            ValueError: If `"_options"` key is present in `params`, which is 
not allowed.
+        """
+        if "_options" in self.params:
+            raise ValueError('"_options" is not allowed in params')
+
     def validate_defaults(self) -> None:
         """
         Validate whether the given defaults pass the following criteria.
@@ -181,6 +204,95 @@ class HITLOperator(BaseOperator):
         ):
             raise ValueError(f"params_input {params_input} does not match 
params {self.params}")
 
+    def generate_link_to_ui(
+        self,
+        *,
+        task_instance: RuntimeTaskInstanceProtocol,
+        base_url: str | None = None,
+        options: str | list[str] | None = None,
+        params_input: dict[str, Any] | None = None,
+    ) -> str:
+        """
+        Generate a URL link to the "required actions" page for a specific task 
instance.
+
+        This URL includes query parameters based on allowed options and 
parameters.
+
+        Args:
+            task_instance: The task instance to generate the link for.
+            base_url: Optional base URL to use. Defaults to ``api.base_url`` 
from config.
+            options: Optional subset of allowed options to include in the URL.
+            params_input: Optional subset of allowed params to include in the 
URL.
+
+        Raises:
+            ValueError: If any provided option or parameter is invalid.
+            ValueError: If no base_url can be determined.
+
+        Returns:
+            The full URL pointing to the required actions page with query 
parameters.
+        """
+        query_param: dict[str, Any] = {}
+        options = [options] if isinstance(options, str) else options
+        if options:
+            if diff := set(options) - set(self.options):
+                raise ValueError(f"options {diff} are not valid options")
+            query_param["_options"] = ",".join(options)
+
+        if params_input:
+            if diff := set(params_input.keys()) - set(self.params.keys()):
+                raise ValueError(f"params {diff} are not valid params")
+            query_param.update(params_input)
+
+        if not (base_url := base_url or conf.get("api", "base_url", 
fallback=None)):
+            raise ValueError("Not able to retrieve base_url")
+
+        query_param["map_index"] = task_instance.map_index
+
+        parsed_base_url: ParseResult = urlparse(base_url)
+        return urlunparse(
+            (
+                parsed_base_url.scheme,
+                parsed_base_url.netloc,
+                
f"/dags/{task_instance.dag_id}/runs/{task_instance.run_id}/tasks/{task_instance.task_id}/required_actions",
+                "",
+                urlencode(query_param) if query_param else "",
+                "",
+            )
+        )
+
+    @staticmethod
+    def generate_link_to_ui_from_context(
+        *,
+        context: Context,
+        base_url: str | None = None,
+        options: list[str] | None = None,
+        params_input: dict[str, Any] | None = None,
+    ) -> str:
+        """
+        Generate a "required actions" page URL from a task context.
+
+        Delegates to ``generate_link_to_ui`` using the task and task_instance 
extracted from
+        the provided context.
+
+        Args:
+            context: The Airflow task context containing 'task' and 
'task_instance'.
+            base_url: Optional base URL to use.
+            options: Optional list of allowed options to include.
+            params_input: Optional dictionary of allowed parameters to include.
+
+        Returns:
+            The full URL pointing to the required actions page with query 
parameters.
+        """
+        hitl_op = context["task"]
+        if not isinstance(hitl_op, HITLOperator):
+            raise ValueError("This method only supports HITLOperator")
+
+        return hitl_op.generate_link_to_ui(
+            task_instance=context["task_instance"],
+            base_url=base_url,
+            options=options,
+            params_input=params_input,
+        )
+
 
 class ApprovalOperator(HITLOperator, SkipMixin):
     """Human-in-the-loop Operator that has only 'Approval' and 'Reject' 
options."""
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py 
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index df5777547d9..7d311dec451 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -31,7 +31,7 @@ import pytest
 from sqlalchemy import select
 
 from airflow.exceptions import DownstreamTasksSkipped
-from airflow.models import Trigger
+from airflow.models import TaskInstance, Trigger
 from airflow.models.hitl import HITLDetail
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.operators.hitl import (
@@ -43,6 +43,8 @@ from airflow.providers.standard.operators.hitl import (
 from airflow.sdk import Param, timezone
 from airflow.sdk.definitions.param import ParamsDict
 
+from tests_common.test_utils.config import conf_vars
+
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
@@ -54,6 +56,23 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 INTERVAL = datetime.timedelta(hours=12)
 
 
[email protected]
+def hitl_task_and_ti_for_generating_link(dag_maker: DagMaker) -> 
tuple[HITLOperator, TaskInstance]:
+    with dag_maker("test_dag"):
+        task = HITLOperator(
+            task_id="hitl_test",
+            subject="This is subject",
+            options=["1", "2", "3", "4", "5"],
+            body="This is body",
+            defaults=["1"],
+            respondents="test",
+            multiple=True,
+            params=ParamsDict({"input_1": 1, "input_2": 2, "input_3": 3}),
+        )
+    dr = dag_maker.create_dagrun()
+    return task, dag_maker.run_ti(task.task_id, dr)
+
+
 class TestHITLOperator:
     def test_validate_options(self) -> None:
         hitl_op = HITLOperator(
@@ -65,20 +84,42 @@ class TestHITLOperator:
             multiple=False,
             params=ParamsDict({"input_1": 1}),
         )
-        hitl_op.validate_defaults()
+        hitl_op.validate_options()
 
-    def test_validate_options_with_empty_options(self) -> None:
-        with pytest.raises(ValueError, match='"options" cannot be empty.'):
+    @pytest.mark.parametrize(
+        "options, expected_err_msg",
+        [
+            ([], '"options" cannot be empty.'),
+            (["1,1", "1", "2"], '"," is not allowed in option'),
+        ],
+        ids=["empty", "comma"],
+    )
+    def test_validate_options_with_empty_options(self, options: list[str], 
expected_err_msg: str) -> None:
+        # validate_options is called during initialization
+        with pytest.raises(ValueError, match=expected_err_msg):
             HITLOperator(
                 task_id="hitl_test",
                 subject="This is subject",
-                options=[],
+                options=options,
                 body="This is body",
                 defaults=["1"],
                 multiple=False,
                 params=ParamsDict({"input_1": 1}),
             )
 
+    def test_validate_params_with__options(self) -> None:
+        # validate_params is called during initialization
+        with pytest.raises(ValueError, match='"_options" is not allowed in 
params'):
+            HITLOperator(
+                task_id="hitl_test",
+                subject="This is subject",
+                options=["1", "2"],
+                body="This is body",
+                defaults=["1"],
+                multiple=False,
+                params=ParamsDict({"_options": 1}),
+            )
+
     def test_validate_defaults(self) -> None:
         hitl_op = HITLOperator(
             task_id="hitl_test",
@@ -110,6 +151,7 @@ class TestHITLOperator:
         extra_kwargs: dict[str, Any],
         expected_error_msg: str,
     ) -> None:
+        # validate_default is called during initialization
         with pytest.raises(ValueError, match=expected_error_msg):
             HITLOperator(
                 task_id="hitl_test",
@@ -238,6 +280,117 @@ class TestHITLOperator:
                 },
             )
 
+    @pytest.mark.parametrize(
+        "options, params_input, expected_query_string",
+        [
+            (None, None, "?map_index=-1"),
+            ("1", None, "?_options=1&map_index=-1"),
+            (["1", "2"], None, "?_options=1%2C2&map_index=-1"),
+            (None, {"input_1": 123}, "?input_1=123&map_index=-1"),
+            (
+                ["3", "4", "5"],
+                {"input_1": 123123, "input_2": 345345},
+                
"?_options=3%2C4%2C5&input_1=123123&input_2=345345&map_index=-1",
+            ),
+        ],
+        ids=[
+            "empty",
+            "single-option",
+            "multiple-options",
+            "single-param-input",
+            "multiple-options-and-param-inputs",
+        ],
+    )
+    @pytest.mark.parametrize("base_url", ["http://test";, "http://test_2:8080";])
+    def test_generate_link_to_ui(
+        self,
+        base_url: str,
+        options: list[str] | None,
+        params_input: dict[str, Any] | None,
+        expected_query_string: str,
+        hitl_task_and_ti_for_generating_link: tuple[HITLOperator, 
TaskInstance],
+    ) -> None:
+        expected_url = (
+            
f"{base_url}/dags/test_dag/runs/test/tasks/hitl_test/required_actions{expected_query_string}"
+        )
+        task, ti = hitl_task_and_ti_for_generating_link
+        url = task.generate_link_to_ui(
+            task_instance=ti,
+            base_url=base_url,
+            options=options,
+            params_input=params_input,
+        )
+        assert url == expected_url
+
+    @pytest.mark.parametrize(
+        "options, params_input, expected_query_string",
+        [
+            (None, None, "?map_index=-1"),
+            ("1", None, "?_options=1&map_index=-1"),
+            (["1", "2"], None, "?_options=1%2C2&map_index=-1"),
+            (None, {"input_1": 123}, "?input_1=123&map_index=-1"),
+            (
+                ["3", "4", "5"],
+                {"input_1": 123123, "input_2": 345345},
+                
"?_options=3%2C4%2C5&input_1=123123&input_2=345345&map_index=-1",
+            ),
+        ],
+        ids=[
+            "empty",
+            "single-option",
+            "multiple-options",
+            "single-param-input",
+            "multiple-options-and-param-inputs",
+        ],
+    )
+    @conf_vars({("api", "base_url"): "http://localhost:8080/"})
+    def test_generate_link_fall_back_to_conf_api_base_url(
+        self,
+        options: list[str] | None,
+        params_input: dict[str, Any] | None,
+        expected_query_string: str,
+        hitl_task_and_ti_for_generating_link: tuple[HITLOperator, 
TaskInstance],
+    ) -> None:
+        task, ti = hitl_task_and_ti_for_generating_link
+        expected_url = 
f"http://localhost:8080/dags/test_dag/runs/test/tasks/hitl_test/required_actions{expected_query_string}";
+
+        url = task.generate_link_to_ui(
+            task_instance=ti,
+            options=options,
+            params_input=params_input,
+        )
+        assert url == expected_url
+
+    @pytest.mark.parametrize(
+        "options, params_input, expected_err_msg",
+        [
+            ([100, "2", 30000], None, "options {.*} are not valid options"),
+            (
+                None,
+                {"input_not_exist": 123, "no_such_key": 123},
+                "params {.*} are not valid params",
+            ),
+        ],
+    )
+    def test_generate_link_to_ui_with_invalid_input(
+        self,
+        options: list[str] | None,
+        params_input: dict[str, Any] | None,
+        expected_err_msg: str,
+        hitl_task_and_ti_for_generating_link: tuple[HITLOperator, 
TaskInstance],
+    ) -> None:
+        task, ti = hitl_task_and_ti_for_generating_link
+        with pytest.raises(ValueError, match=expected_err_msg):
+            task.generate_link_to_ui(task_instance=ti, options=options, 
params_input=params_input)
+
+    def test_generate_link_to_ui_without_base_url(
+        self,
+        hitl_task_and_ti_for_generating_link: tuple[HITLOperator, 
TaskInstance],
+    ) -> None:
+        task, ti = hitl_task_and_ti_for_generating_link
+        with pytest.raises(ValueError, match="Not able to retrieve base_url"):
+            task.generate_link_to_ui(task_instance=ti)
+
 
 class TestApprovalOperator:
     def test_init_with_options(self) -> None:

Reply via email to