This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 786ceec71fb feat(hitl): add utility functions for generating the url
to required actions page (#54827)
786ceec71fb is described below
commit 786ceec71fb6f61cd45439349115fca99ffe17ee
Author: Wei Lee <[email protected]>
AuthorDate: Wed Aug 27 16:41:16 2025 +0800
feat(hitl): add utility functions for generating the url to required
actions page (#54827)
---
airflow-core/docs/tutorial/hitl.rst | 15 +-
.../standard/example_dags/example_hitl_operator.py | 5 +
.../airflow/providers/standard/operators/hitl.py | 116 ++++++++++++++-
.../tests/unit/standard/operators/test_hitl.py | 163 ++++++++++++++++++++-
4 files changed, 289 insertions(+), 10 deletions(-)
diff --git a/airflow-core/docs/tutorial/hitl.rst
b/airflow-core/docs/tutorial/hitl.rst
index 70de9df757b..b104bc7d033 100644
--- a/airflow-core/docs/tutorial/hitl.rst
+++ b/airflow-core/docs/tutorial/hitl.rst
@@ -149,9 +149,18 @@ After the branch is chosen, the workflow will proceed
along the selected path.
Notifiers
---------
-A notifier is a callback mechanism that allows you to handle HITL events, such
as when a task is waiting for human input, succeeds, or fails.
-The example uses a notifier ``LocalLogNotifier`` that logs messages for
demonstration.
-You can implement your own notifier for different functionalities.
+A notifier is a callback mechanism for handling HITL events, such as when a
task is waiting for human input, succeeds, or fails.
+The example uses the ``LocalLogNotifier``, which logs messages for
demonstration purposes.
+
+The method ``HITLOperator.generate_link_to_ui_from_context`` can be used to
generate a direct link to the UI page where the user should respond. It accepts
four arguments:
+
+- ``context`` – automatically passed to ``notify`` by the notifier
+- ``base_url`` – (optional) the base URL of the Airflow UI; if not provided,
``api.base_url`` in the configuration will be used
+- ``options`` – (optional) pre-selected options for the UI page
+- ``params_inputs`` – (optional) pre-loaded inputs for the UI page
+
+This makes it easy to include actionable links in notifications or logs.
+You can also implement your own notifier to provide different functionalities.
For more details, please refer to `Creating a notifier
<https://airflow.apache.org/docs/apache-airflow/stable/howto/notifications.html>`_
and `Notifications
<https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/notifications.html>`_.
In the example Dag, the notifier is defined as follows:
diff --git
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
index ca3ca4dc1ee..dd7b4dc277f 100644
---
a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
+++
b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py
@@ -47,7 +47,12 @@ class LocalLogNotifier(BaseNotifier):
self.message = message
def notify(self, context: Context) -> None:
+ url = HITLOperator.generate_link_to_ui_from_context(
+ context=context,
+ base_url="http://localhost:28080",
+ )
self.log.info(self.message)
+ self.log.info("Url to respond %s", url)
hitl_request_callback = LocalLogNotifier(
diff --git
a/providers/standard/src/airflow/providers/standard/operators/hitl.py
b/providers/standard/src/airflow/providers/standard/operators/hitl.py
index 207771b4f40..bad99f18ee7 100644
--- a/providers/standard/src/airflow/providers/standard/operators/hitl.py
+++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py
@@ -20,26 +20,28 @@ import logging
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS
-from airflow.sdk.bases.notifier import BaseNotifier
if not AIRFLOW_V_3_1_PLUS:
raise AirflowOptionalProviderFeatureException("Human in the loop
functionality needs Airflow 3.1+.")
-
from collections.abc import Collection, Mapping, Sequence
from typing import TYPE_CHECKING, Any
+from urllib.parse import ParseResult, urlencode, urlparse, urlunparse
+from airflow.configuration import conf
from airflow.providers.standard.exceptions import HITLTimeoutError,
HITLTriggerEventError
from airflow.providers.standard.operators.branch import BranchMixIn
from airflow.providers.standard.triggers.hitl import HITLTrigger,
HITLTriggerEventSuccessPayload
from airflow.providers.standard.utils.skipmixin import SkipMixin
from airflow.providers.standard.version_compat import BaseOperator
+from airflow.sdk.bases.notifier import BaseNotifier
from airflow.sdk.definitions.param import ParamsDict
from airflow.sdk.execution_time.hitl import upsert_hitl_detail
from airflow.sdk.timezone import utcnow
if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
+ from airflow.sdk.types import RuntimeTaskInstanceProtocol
class HITLOperator(BaseOperator):
@@ -87,12 +89,33 @@ class HITLOperator(BaseOperator):
self.respondents = [respondents] if isinstance(respondents, str) else
respondents
self.validate_options()
+ self.validate_params()
self.validate_defaults()
def validate_options(self) -> None:
+ """
+ Validate the `options` attribute of the instance.
+
+ Raises:
+ ValueError: If `options` is empty.
+ ValueError: If any option contains a comma (`,`), which is not
allowed.
+ """
if not self.options:
raise ValueError('"options" cannot be empty.')
+ if any("," in option for option in self.options):
+ raise ValueError('"," is not allowed in option')
+
+ def validate_params(self) -> None:
+ """
+ Validate the `params` attribute of the instance.
+
+ Raises:
+ ValueError: If `"_options"` key is present in `params`, which is
not allowed.
+ """
+ if "_options" in self.params:
+ raise ValueError('"_options" is not allowed in params')
+
def validate_defaults(self) -> None:
"""
Validate whether the given defaults pass the following criteria.
@@ -181,6 +204,95 @@ class HITLOperator(BaseOperator):
):
raise ValueError(f"params_input {params_input} does not match
params {self.params}")
+ def generate_link_to_ui(
+ self,
+ *,
+ task_instance: RuntimeTaskInstanceProtocol,
+ base_url: str | None = None,
+ options: str | list[str] | None = None,
+ params_input: dict[str, Any] | None = None,
+ ) -> str:
+ """
+ Generate a URL link to the "required actions" page for a specific task
instance.
+
+ This URL includes query parameters based on allowed options and
parameters.
+
+ Args:
+ task_instance: The task instance to generate the link for.
+ base_url: Optional base URL to use. Defaults to ``api.base_url``
from config.
+ options: Optional subset of allowed options to include in the URL.
+ params_input: Optional subset of allowed params to include in the
URL.
+
+ Raises:
+ ValueError: If any provided option or parameter is invalid.
+ ValueError: If no base_url can be determined.
+
+ Returns:
+ The full URL pointing to the required actions page with query
parameters.
+ """
+ query_param: dict[str, Any] = {}
+ options = [options] if isinstance(options, str) else options
+ if options:
+ if diff := set(options) - set(self.options):
+ raise ValueError(f"options {diff} are not valid options")
+ query_param["_options"] = ",".join(options)
+
+ if params_input:
+ if diff := set(params_input.keys()) - set(self.params.keys()):
+ raise ValueError(f"params {diff} are not valid params")
+ query_param.update(params_input)
+
+ if not (base_url := base_url or conf.get("api", "base_url",
fallback=None)):
+ raise ValueError("Not able to retrieve base_url")
+
+ query_param["map_index"] = task_instance.map_index
+
+ parsed_base_url: ParseResult = urlparse(base_url)
+ return urlunparse(
+ (
+ parsed_base_url.scheme,
+ parsed_base_url.netloc,
+
f"/dags/{task_instance.dag_id}/runs/{task_instance.run_id}/tasks/{task_instance.task_id}/required_actions",
+ "",
+ urlencode(query_param) if query_param else "",
+ "",
+ )
+ )
+
+ @staticmethod
+ def generate_link_to_ui_from_context(
+ *,
+ context: Context,
+ base_url: str | None = None,
+ options: list[str] | None = None,
+ params_input: dict[str, Any] | None = None,
+ ) -> str:
+ """
+ Generate a "required actions" page URL from a task context.
+
+ Delegates to ``generate_link_to_ui`` using the task and task_instance
extracted from
+ the provided context.
+
+ Args:
+ context: The Airflow task context containing 'task' and
'task_instance'.
+ base_url: Optional base URL to use.
+ options: Optional list of allowed options to include.
+ params_input: Optional dictionary of allowed parameters to include.
+
+ Returns:
+ The full URL pointing to the required actions page with query
parameters.
+ """
+ hitl_op = context["task"]
+ if not isinstance(hitl_op, HITLOperator):
+ raise ValueError("This method only supports HITLOperator")
+
+ return hitl_op.generate_link_to_ui(
+ task_instance=context["task_instance"],
+ base_url=base_url,
+ options=options,
+ params_input=params_input,
+ )
+
class ApprovalOperator(HITLOperator, SkipMixin):
"""Human-in-the-loop Operator that has only 'Approval' and 'Reject'
options."""
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py
b/providers/standard/tests/unit/standard/operators/test_hitl.py
index df5777547d9..7d311dec451 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -31,7 +31,7 @@ import pytest
from sqlalchemy import select
from airflow.exceptions import DownstreamTasksSkipped
-from airflow.models import Trigger
+from airflow.models import TaskInstance, Trigger
from airflow.models.hitl import HITLDetail
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.hitl import (
@@ -43,6 +43,8 @@ from airflow.providers.standard.operators.hitl import (
from airflow.sdk import Param, timezone
from airflow.sdk.definitions.param import ParamsDict
+from tests_common.test_utils.config import conf_vars
+
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@@ -54,6 +56,23 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
INTERVAL = datetime.timedelta(hours=12)
[email protected]
+def hitl_task_and_ti_for_generating_link(dag_maker: DagMaker) ->
tuple[HITLOperator, TaskInstance]:
+ with dag_maker("test_dag"):
+ task = HITLOperator(
+ task_id="hitl_test",
+ subject="This is subject",
+ options=["1", "2", "3", "4", "5"],
+ body="This is body",
+ defaults=["1"],
+ respondents="test",
+ multiple=True,
+ params=ParamsDict({"input_1": 1, "input_2": 2, "input_3": 3}),
+ )
+ dr = dag_maker.create_dagrun()
+ return task, dag_maker.run_ti(task.task_id, dr)
+
+
class TestHITLOperator:
def test_validate_options(self) -> None:
hitl_op = HITLOperator(
@@ -65,20 +84,42 @@ class TestHITLOperator:
multiple=False,
params=ParamsDict({"input_1": 1}),
)
- hitl_op.validate_defaults()
+ hitl_op.validate_options()
- def test_validate_options_with_empty_options(self) -> None:
- with pytest.raises(ValueError, match='"options" cannot be empty.'):
+ @pytest.mark.parametrize(
+ "options, expected_err_msg",
+ [
+ ([], '"options" cannot be empty.'),
+ (["1,1", "1", "2"], '"," is not allowed in option'),
+ ],
+ ids=["empty", "comma"],
+ )
+ def test_validate_options_with_empty_options(self, options: list[str],
expected_err_msg: str) -> None:
+ # validate_options is called during initialization
+ with pytest.raises(ValueError, match=expected_err_msg):
HITLOperator(
task_id="hitl_test",
subject="This is subject",
- options=[],
+ options=options,
body="This is body",
defaults=["1"],
multiple=False,
params=ParamsDict({"input_1": 1}),
)
+ def test_validate_params_with__options(self) -> None:
+ # validate_params is called during initialization
+ with pytest.raises(ValueError, match='"_options" is not allowed in
params'):
+ HITLOperator(
+ task_id="hitl_test",
+ subject="This is subject",
+ options=["1", "2"],
+ body="This is body",
+ defaults=["1"],
+ multiple=False,
+ params=ParamsDict({"_options": 1}),
+ )
+
def test_validate_defaults(self) -> None:
hitl_op = HITLOperator(
task_id="hitl_test",
@@ -110,6 +151,7 @@ class TestHITLOperator:
extra_kwargs: dict[str, Any],
expected_error_msg: str,
) -> None:
+ # validate_default is called during initialization
with pytest.raises(ValueError, match=expected_error_msg):
HITLOperator(
task_id="hitl_test",
@@ -238,6 +280,117 @@ class TestHITLOperator:
},
)
+ @pytest.mark.parametrize(
+ "options, params_input, expected_query_string",
+ [
+ (None, None, "?map_index=-1"),
+ ("1", None, "?_options=1&map_index=-1"),
+ (["1", "2"], None, "?_options=1%2C2&map_index=-1"),
+ (None, {"input_1": 123}, "?input_1=123&map_index=-1"),
+ (
+ ["3", "4", "5"],
+ {"input_1": 123123, "input_2": 345345},
+
"?_options=3%2C4%2C5&input_1=123123&input_2=345345&map_index=-1",
+ ),
+ ],
+ ids=[
+ "empty",
+ "single-option",
+ "multiple-options",
+ "single-param-input",
+ "multiple-options-and-param-inputs",
+ ],
+ )
+ @pytest.mark.parametrize("base_url", ["http://test", "http://test_2:8080"])
+ def test_generate_link_to_ui(
+ self,
+ base_url: str,
+ options: list[str] | None,
+ params_input: dict[str, Any] | None,
+ expected_query_string: str,
+ hitl_task_and_ti_for_generating_link: tuple[HITLOperator,
TaskInstance],
+ ) -> None:
+ expected_url = (
+
f"{base_url}/dags/test_dag/runs/test/tasks/hitl_test/required_actions{expected_query_string}"
+ )
+ task, ti = hitl_task_and_ti_for_generating_link
+ url = task.generate_link_to_ui(
+ task_instance=ti,
+ base_url=base_url,
+ options=options,
+ params_input=params_input,
+ )
+ assert url == expected_url
+
+ @pytest.mark.parametrize(
+ "options, params_input, expected_query_string",
+ [
+ (None, None, "?map_index=-1"),
+ ("1", None, "?_options=1&map_index=-1"),
+ (["1", "2"], None, "?_options=1%2C2&map_index=-1"),
+ (None, {"input_1": 123}, "?input_1=123&map_index=-1"),
+ (
+ ["3", "4", "5"],
+ {"input_1": 123123, "input_2": 345345},
+
"?_options=3%2C4%2C5&input_1=123123&input_2=345345&map_index=-1",
+ ),
+ ],
+ ids=[
+ "empty",
+ "single-option",
+ "multiple-options",
+ "single-param-input",
+ "multiple-options-and-param-inputs",
+ ],
+ )
+ @conf_vars({("api", "base_url"): "http://localhost:8080/"})
+ def test_generate_link_fall_back_to_conf_api_base_url(
+ self,
+ options: list[str] | None,
+ params_input: dict[str, Any] | None,
+ expected_query_string: str,
+ hitl_task_and_ti_for_generating_link: tuple[HITLOperator,
TaskInstance],
+ ) -> None:
+ task, ti = hitl_task_and_ti_for_generating_link
+ expected_url =
f"http://localhost:8080/dags/test_dag/runs/test/tasks/hitl_test/required_actions{expected_query_string}"
+
+ url = task.generate_link_to_ui(
+ task_instance=ti,
+ options=options,
+ params_input=params_input,
+ )
+ assert url == expected_url
+
+ @pytest.mark.parametrize(
+ "options, params_input, expected_err_msg",
+ [
+ ([100, "2", 30000], None, "options {.*} are not valid options"),
+ (
+ None,
+ {"input_not_exist": 123, "no_such_key": 123},
+ "params {.*} are not valid params",
+ ),
+ ],
+ )
+ def test_generate_link_to_ui_with_invalid_input(
+ self,
+ options: list[str] | None,
+ params_input: dict[str, Any] | None,
+ expected_err_msg: str,
+ hitl_task_and_ti_for_generating_link: tuple[HITLOperator,
TaskInstance],
+ ) -> None:
+ task, ti = hitl_task_and_ti_for_generating_link
+ with pytest.raises(ValueError, match=expected_err_msg):
+ task.generate_link_to_ui(task_instance=ti, options=options,
params_input=params_input)
+
+ def test_generate_link_to_ui_without_base_url(
+ self,
+ hitl_task_and_ti_for_generating_link: tuple[HITLOperator,
TaskInstance],
+ ) -> None:
+ task, ti = hitl_task_and_ti_for_generating_link
+ with pytest.raises(ValueError, match="Not able to retrieve base_url"):
+ task.generate_link_to_ui(task_instance=ti)
+
class TestApprovalOperator:
def test_init_with_options(self) -> None: