kaxil commented on code in PR #63081:
URL: https://github.com/apache/airflow/pull/63081#discussion_r2901183999


##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -91,28 +144,56 @@ def __init__(
         self.enable_tool_logging = enable_tool_logging
         self.agent_params = agent_params or {}
 
+        self.enable_hitl_review = enable_hitl_review
+        self.max_hitl_iterations = max_hitl_iterations
+        self.hitl_timeout = hitl_timeout
+        self.hitl_poll_interval = hitl_poll_interval
+        self.webhook_url = webhook_url

Review Comment:
   `self.webhook_url` is stored but nothing in the mixin, operator, or plugin 
ever reads or posts to it. The docstring says it sends "a JSON notification 
when a session is created or a new output is generated" — users who configure 
this expecting Slack/PagerDuty notifications get silent failure.
   
   Either implement the webhook call or remove the parameter and add it in a 
follow-up.



##########
providers/common/ai/src/airflow/providers/common/ai/mixins/hitl_review.py:
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, Protocol
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    AgentSessionData,
+    HumanActionData,
+    SessionStatus,
+)
+
+log = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class HITLReviewProtocol(Protocol):
+    """Attributes that the host operator must provide."""
+
+    enable_hitl_review: bool
+    hitl_timeout: timedelta | None
+    hitl_poll_interval: float
+    prompt: str
+    task_id: str
+    log: Any
+
+
+class HITLReviewMixin:
+    """
+    Mixin that drives an iterative HITL review loop inside ``execute()``.
+
+    After the operator generates its first output, the mixin:
+
+    1. Pushes session metadata and the first agent output to XCom.
+    2. Polls the human action XCom (``airflow_hitl_review_human_action``) at 
``hitl_poll_interval`` seconds.
+    3. When a human sets action to ``changes_requested`` (via the plugin API),
+       calls :meth:`regenerate_with_feedback` and pushes the new agent output.
+    4. When a human sets action to ``approved``, returns the output.
+    5. When a human sets action to ``rejected``, raises a `HITLRejectException`
+
+    The loop stops after ``hitl_timeout``.
+
+    All agent outputs and human feedback are persisted as iteration-keyed
+    XCom entries (``airflow_hitl_review_agent_output_1``, 
``airflow_hitl_review_human_feedback_1``, etc.) for full
+    auditability.
+
+    Operators using this mixin must set:
+
+    - ``enable_hitl_review`` (``bool``)
+    - ``hitl_timeout`` (``timedelta | None``)
+    - ``hitl_poll_interval`` (``float``, seconds)
+    - ``prompt`` (``str``)
+
+    And must implement: meth:`regenerate_with_feedback`.
+    """
+
+    def run_hitl_review(
+        self: HITLReviewProtocol,
+        context: Context,
+        output: Any,
+        *,
+        message_history: Any = None,
+    ) -> str:
+        """
+        Execute the full HITL review loop.
+
+        :param context: Airflow task context.
+        :param output: Initial LLM output (str or BaseModel).
+        :param message_history: Provider-specific conversation state (e.g.
+            pydantic-ai ``list[ModelMessage]``).  Passed to
+            :meth:`regenerate_with_feedback` on each iteration.
+        :returns: The final approved (or max-iteration) output as a string.
+        """
+        output_str = self._to_string(output)
+        ti = context["task_instance"]
+
+        session = AgentSessionData(
+            status=SessionStatus.PENDING_REVIEW,
+            iteration=1,
+            prompt=self.prompt,
+            current_output=output_str,
+        )
+
+        ti.xcom_push(key=XCOM_AGENT_SESSION, 
value=session.model_dump(mode="json"))
+        ti.xcom_push(key=f"{XCOM_AGENT_OUTPUT_PREFIX}1", value=output_str)
+
+        self.log.info(
+            "Feedback session created for %s/%s/%s (poll every %ds).",
+            ti.dag_id,
+            ti.run_id,
+            ti.task_id,
+            self.hitl_poll_interval,
+        )
+
+        deadline = time.monotonic() + self.hitl_timeout.total_seconds() if 
self.hitl_timeout else None
+
+        return self._poll_loop(
+            ti=ti,
+            session=session,
+            message_history=message_history,
+            deadline=deadline,
+        )
+
+    def _poll_loop(
+        self: HITLReviewProtocol,

Review Comment:
   `max_hitl_iterations` is stored on the operator (`agent.py` line 148) but 
never checked here in `_poll_loop`. This loop runs until approve/reject/timeout 
with no iteration cap.
   
   A user who sets `max_hitl_iterations=3` with no timeout gets a task that 
blocks forever through unlimited generate-review cycles, burning LLM API 
credits and holding a worker slot.
   
   Either enforce it here:
   ```python
   if new_iteration > self.max_hitl_iterations:
       self.log.info("Max iterations (%d) reached, returning last output.", 
self.max_hitl_iterations)
       return session.current_output
   ```
   Or remove the parameter until it's implemented. Dead params are confusing.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/hitl_review.py:
##########
@@ -0,0 +1,530 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+from typing import Annotated
+
+from fastapi import Depends, FastAPI, HTTPException, Query
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.models.xcom import XComModel
+from airflow.plugins_manager import AirflowPlugin
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    XCOM_HUMAN_FEEDBACK_PREFIX,
+    AgentSessionData,
+    HITLReviewResponse,
+    HumanActionData,
+    HumanFeedbackRequest,
+    SessionStatus,
+)
+from airflow.utils.session import create_session
+
+log = logging.getLogger(__name__)
+
+_PLUGIN_PREFIX = "/hitl-review"
+
+
+def _get_session():
+    with create_session(scoped=False) as session:
+        yield session
+
+
+SessionDep = Annotated[Session, Depends(_get_session, scope="function")]
+
+
+def _get_map_index(q: str = Query("-1", alias="map_index")) -> int:
+    """Parse map_index query; use -1 when placeholder unreplaced (e.g. 
``{MAP_INDEX}``) or invalid."""
+    try:
+        return int(q)
+    except (ValueError, TypeError):
+        return -1
+
+
+MapIndexDep = Annotated[int, Depends(_get_map_index)]
+
+
+def _read_xcom(session: Session, *, dag_id: str, run_id: str, task_id: str, 
map_index: int = -1, key: str):
+    """Read a single XCom value from the database."""
+    row = session.scalars(
+        XComModel.get_many(
+            run_id=run_id,
+            key=key,
+            dag_ids=dag_id,
+            task_ids=task_id,
+            map_indexes=map_index,
+            limit=1,
+        )
+    ).first()
+    if row is None:
+        return None
+    return XComModel.deserialize_value(row)
+
+
+def _read_xcom_by_prefix(

Review Comment:
   `_read_xcom_by_prefix` reads `XComModel.value` directly via 
`select(XComModel.key, XComModel.value)`, while `_read_xcom` (line 84) uses 
`XComModel.deserialize_value(row)`. The raw column read works because values 
are stored with `serialize=False`, but if any code path writes to these keys 
with the default `serialize=True`, this function returns the serialized form 
instead of the deserialized one.
   
   Use `XComModel.deserialize_value()` consistently, or add a comment 
documenting the invariant that these keys are always stored with 
`serialize=False`.



##########
providers/common/ai/src/airflow/providers/common/ai/mixins/hitl_review.py:
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, Protocol
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    AgentSessionData,
+    HumanActionData,
+    SessionStatus,
+)
+
+log = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class HITLReviewProtocol(Protocol):
+    """Attributes that the host operator must provide."""
+
+    enable_hitl_review: bool
+    hitl_timeout: timedelta | None
+    hitl_poll_interval: float
+    prompt: str
+    task_id: str
+    log: Any
+
+
+class HITLReviewMixin:
+    """
+    Mixin that drives an iterative HITL review loop inside ``execute()``.
+
+    After the operator generates its first output, the mixin:
+
+    1. Pushes session metadata and the first agent output to XCom.
+    2. Polls the human action XCom (``airflow_hitl_review_human_action``) at 
``hitl_poll_interval`` seconds.
+    3. When a human sets action to ``changes_requested`` (via the plugin API),
+       calls :meth:`regenerate_with_feedback` and pushes the new agent output.
+    4. When a human sets action to ``approved``, returns the output.
+    5. When a human sets action to ``rejected``, raises a `HITLRejectException`
+
+    The loop stops after ``hitl_timeout``.
+
+    All agent outputs and human feedback are persisted as iteration-keyed
+    XCom entries (``airflow_hitl_review_agent_output_1``, 
``airflow_hitl_review_human_feedback_1``, etc.) for full
+    auditability.
+
+    Operators using this mixin must set:
+
+    - ``enable_hitl_review`` (``bool``)
+    - ``hitl_timeout`` (``timedelta | None``)
+    - ``hitl_poll_interval`` (``float``, seconds)
+    - ``prompt`` (``str``)
+
+    And must implement: meth:`regenerate_with_feedback`.
+    """
+
+    def run_hitl_review(
+        self: HITLReviewProtocol,
+        context: Context,
+        output: Any,
+        *,
+        message_history: Any = None,
+    ) -> str:
+        """
+        Execute the full HITL review loop.
+
+        :param context: Airflow task context.
+        :param output: Initial LLM output (str or BaseModel).
+        :param message_history: Provider-specific conversation state (e.g.
+            pydantic-ai ``list[ModelMessage]``).  Passed to
+            :meth:`regenerate_with_feedback` on each iteration.
+        :returns: The final approved (or max-iteration) output as a string.
+        """
+        output_str = self._to_string(output)
+        ti = context["task_instance"]
+
+        session = AgentSessionData(
+            status=SessionStatus.PENDING_REVIEW,
+            iteration=1,
+            prompt=self.prompt,
+            current_output=output_str,
+        )
+
+        ti.xcom_push(key=XCOM_AGENT_SESSION, 
value=session.model_dump(mode="json"))
+        ti.xcom_push(key=f"{XCOM_AGENT_OUTPUT_PREFIX}1", value=output_str)
+
+        self.log.info(
+            "Feedback session created for %s/%s/%s (poll every %ds).",
+            ti.dag_id,
+            ti.run_id,
+            ti.task_id,
+            self.hitl_poll_interval,
+        )
+
+        deadline = time.monotonic() + self.hitl_timeout.total_seconds() if 
self.hitl_timeout else None
+
+        return self._poll_loop(
+            ti=ti,
+            session=session,
+            message_history=message_history,
+            deadline=deadline,
+        )
+
+    def _poll_loop(
+        self: HITLReviewProtocol,
+        *,
+        ti: Any,
+        session: AgentSessionData,
+        message_history: Any,
+        deadline: float | None,
+    ) -> str:
+        """
+        Block until the session reaches a terminal state.
+
+        This loops until the human actions approved or rejected or the timeout 
expires.
+        """
+        from airflow.providers.standard.exceptions import (
+            HITLRejectException,
+            HITLTimeoutError,
+        )
+        last_seen_iteration = 0
+
+        while True:
+            if deadline is not None and time.monotonic() > deadline:
+                raise HITLTimeoutError(f"No response within 
{self.hitl_timeout}.")
+
+            time.sleep(self.hitl_poll_interval)
+            action_raw = ti.xcom_pull(key=XCOM_HUMAN_ACTION, 
task_ids=ti.task_id, map_indexes=ti.map_index)
+            if action_raw is None:
+                # Human action may take some time to propagate; it must be 
performed in the UI, after which
+                # the plugin updates XCom with this XCOM_HUMAN_ACTION. Until 
then, continue looping.
+                continue
+
+            try:
+                if isinstance(action_raw, str):
+                    action = HumanActionData.model_validate_json(action_raw)
+                else:
+                    action = HumanActionData.model_validate(action_raw)
+            except Exception:
+                self.log.warning("Malformed human action XCom: %r", action_raw)

Review Comment:
   This `except Exception` catches both Pydantic validation errors (expected) 
and `xcom_pull` failures like DB errors or network issues (not expected). 
Infrastructure failures get logged as "Malformed human action XCom" which is 
misleading.
   
   Split the try/except:
   ```python
   try:
       action_raw = ti.xcom_pull(...)
   except Exception:
       self.log.warning("Failed to pull XCom", exc_info=True)
       continue
   
   if action_raw is None:
       continue
   
   try:
       action = HumanActionData.model_validate(...)
   except Exception:
       self.log.warning("Malformed human action XCom: %r", action_raw)
       continue
   ```



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/www/package.json:
##########
@@ -0,0 +1,44 @@
+{

Review Comment:
   nit: both `package-lock.json` and `pnpm-lock.yaml` are committed. Pick one 
package manager and remove the other lock file. Add a `packageManager` field to 
`package.json` to make it explicit.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/hitl_review.py:
##########
@@ -0,0 +1,530 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+from typing import Annotated
+
+from fastapi import Depends, FastAPI, HTTPException, Query
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.models.xcom import XComModel
+from airflow.plugins_manager import AirflowPlugin
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    XCOM_HUMAN_FEEDBACK_PREFIX,
+    AgentSessionData,
+    HITLReviewResponse,
+    HumanActionData,
+    HumanFeedbackRequest,
+    SessionStatus,
+)
+from airflow.utils.session import create_session
+
+log = logging.getLogger(__name__)
+
+_PLUGIN_PREFIX = "/hitl-review"
+
+
+def _get_session():
+    with create_session(scoped=False) as session:
+        yield session
+
+
+SessionDep = Annotated[Session, Depends(_get_session, scope="function")]
+
+
+def _get_map_index(q: str = Query("-1", alias="map_index")) -> int:
+    """Parse map_index query; use -1 when placeholder unreplaced (e.g. 
``{MAP_INDEX}``) or invalid."""
+    try:
+        return int(q)
+    except (ValueError, TypeError):
+        return -1
+
+
+MapIndexDep = Annotated[int, Depends(_get_map_index)]
+
+
+def _read_xcom(session: Session, *, dag_id: str, run_id: str, task_id: str, 
map_index: int = -1, key: str):
+    """Read a single XCom value from the database."""
+    row = session.scalars(
+        XComModel.get_many(
+            run_id=run_id,
+            key=key,
+            dag_ids=dag_id,
+            task_ids=task_id,
+            map_indexes=map_index,
+            limit=1,
+        )
+    ).first()
+    if row is None:
+        return None
+    return XComModel.deserialize_value(row)
+
+
+def _read_xcom_by_prefix(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, prefix: str
+) -> dict[int, str]:
+    """Read all iteration-keyed XCom entries matching *prefix* (e.g. 
``airflow_hitl_review_agent_output_``)."""
+    query = select(XComModel.key, XComModel.value).where(
+        XComModel.dag_id == dag_id,
+        XComModel.run_id == run_id,
+        XComModel.task_id == task_id,
+        XComModel.map_index == map_index,
+        XComModel.key.like(f"{prefix}%"),
+    )
+    result: dict[int, str] = {}
+    for key, value in session.execute(query).all():
+        suffix = key[len(prefix) :]
+        if suffix.isdigit():
+            result[int(suffix)] = value
+    return result
+
+
+def _write_xcom(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, key: str, value
+):
+    """
+    Write (upsert) a single XCom value in the database.
+
+    Uses ``serialize=False`` so the value is stored natively in the JSON column
+    without double-encoding (``XComModel.set`` with the default 
``serialize=True``
+    calls ``json.dumps`` which would wrap dicts in a JSON string).
+    """
+    XComModel.set(
+        key=key,
+        value=value,
+        dag_id=dag_id,
+        task_id=task_id,
+        run_id=run_id,
+        map_index=map_index,
+        serialize=False,
+        session=session,
+    )
+
+
+def _parse_model(model_cls, raw):
+    """Parse a Pydantic model from a value that may be a dict or a JSON 
string."""
+    if isinstance(raw, str):
+        return model_cls.model_validate_json(raw)
+    return model_cls.model_validate(raw)
+
+
+_RUNNING_TI_STATES = frozenset({"running", "deferred", "up_for_retry", 
"queued", "scheduled"})

Review Comment:
   These are hardcoded strings instead of the `TaskInstanceState` enum. If enum 
values change or new intermediate states are added, this set silently becomes 
stale.
   
   ```python
   from airflow.utils.state import TaskInstanceState
   _RUNNING_TI_STATES = frozenset({
       TaskInstanceState.RUNNING,
       TaskInstanceState.DEFERRED,
       TaskInstanceState.UP_FOR_RETRY,
       TaskInstanceState.QUEUED,
       TaskInstanceState.SCHEDULED,
   })
   ```



##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -91,28 +144,56 @@ def __init__(
         self.enable_tool_logging = enable_tool_logging
         self.agent_params = agent_params or {}
 
+        self.enable_hitl_review = enable_hitl_review
+        self.max_hitl_iterations = max_hitl_iterations
+        self.hitl_timeout = hitl_timeout
+        self.hitl_poll_interval = hitl_poll_interval
+        self.webhook_url = webhook_url
+
     @cached_property
     def llm_hook(self) -> PydanticAIHook:
         """Return PydanticAIHook for the configured LLM connection."""
         return PydanticAIHook(llm_conn_id=self.llm_conn_id, 
model_id=self.model_id)
 
-    def execute(self, context: Context) -> Any:
+    def _build_agent(self) -> Agent[None, Any]:
+        """Build and return a pydantic-ai Agent from the operator's config."""
         extra_kwargs = dict(self.agent_params)
         if self.toolsets:
             if self.enable_tool_logging:
                 extra_kwargs["toolsets"] = 
wrap_toolsets_for_logging(self.toolsets, self.log)
             else:
                 extra_kwargs["toolsets"] = self.toolsets
-        agent: Agent[None, Any] = self.llm_hook.create_agent(
+        return self.llm_hook.create_agent(
             output_type=self.output_type,
             instructions=self.system_prompt,
             **extra_kwargs,
         )
 
+    def execute(self, context: Context) -> Any:
+        agent = self._build_agent()
         result = agent.run_sync(self.prompt)
         log_run_summary(self.log, result)
         output = result.output
 
+        if self.enable_hitl_review:
+            return self.run_hitl_review(
+                context,
+                output,
+                message_history=result.all_messages(),
+            )
+
         if isinstance(output, BaseModel):
             return output.model_dump()
         return output
+
+    def regenerate_with_feedback(self, *, feedback: str, message_history: Any) 
-> tuple[str, Any]:
+        """Re-run the agent with *feedback* appended to the conversation 
history."""
+        agent = self._build_agent()
+        messages = message_history or []
+        result = agent.run_sync(feedback, message_history=messages)
+        log_run_summary(self.log, result)
+
+        output = result.output

Review Comment:
   nit: no unit test for `regenerate_with_feedback`. The mixin tests mock it 
out entirely. If pydantic-ai's `message_history` API changes, this breaks 
silently.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/www/src/__init__.py:
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

Review Comment:
   There are four `__init__.py` files inside the React/TypeScript `www/src/` 
tree (`src/`, `src/components/`, `src/hooks/`, `src/types/`). This is a JS 
project, not a Python package — these files aren't needed and will be included 
in the provider wheel as empty Python modules.
   
   The edge3 provider's `www/` directory doesn't have them. Remove these and 
exclude the paths in pre-commit if needed.



##########
providers/common/ai/src/airflow/providers/common/ai/utils/hitl_review.py:
##########
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared data models, exceptions, and XCom key constants for HITL Review.
+
+Used by both the API-server-side plugin (``plugins.hitl_review``) and the
+worker-side operator mixin (``mixins.hitl_review``).  Depends only on
+``pydantic`` and the standard library.
+
+**Storage**: all session state is persisted as XCom entries on the running
+task instance.  See the *XCom key constants* below for the key naming scheme.
+"""
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from enum import Enum
+from typing import Any
+
+from pydantic import BaseModel, Field
+
+"""
+These xcom keys are reserved for agentic operator with HITL feedback loop.
+"""
+
+_XCOM_PREFIX = "airflow_hitl_review_"
+
+XCOM_AGENT_SESSION = f"{_XCOM_PREFIX}agent_session"
+"""Session metadata written by the **worker**.
+
+Value: ``{"status": "...", "iteration": N,
+"prompt": "...", "current_output": "..."}``.
+"""
+
+XCOM_HUMAN_ACTION = f"{_XCOM_PREFIX}human_action"
+"""Human action command written by the **plugin**.
+
+Value: ``{"action": "approve"|"reject"|"changes_requested",
+"feedback": "...", "iteration": N}``.
+"""
+
+XCOM_HITL_CHAT_URL = f"{_XCOM_PREFIX}chat_url"

Review Comment:
   nit: `XCOM_HITL_CHAT_URL` is defined (and tested) but never written or read 
by any functional code. Remove it, or add a comment that it's reserved for 
future use.



##########
providers/common/ai/src/airflow/providers/common/ai/operators/agent.py:
##########
@@ -19,23 +19,51 @@
 from __future__ import annotations
 
 from collections.abc import Sequence
+from datetime import timedelta
 from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
 from pydantic import BaseModel
 
 from airflow.providers.common.ai.hooks.pydantic_ai import PydanticAIHook
+from airflow.providers.common.ai.mixins.hitl_review import HITLReviewMixin
 from airflow.providers.common.ai.utils.logging import log_run_summary, 
wrap_toolsets_for_logging
-from airflow.providers.common.compat.sdk import BaseOperator
+from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink
 
 if TYPE_CHECKING:
     from pydantic_ai import Agent
     from pydantic_ai.toolsets.abstract import AbstractToolset
 
+    from airflow.providers.common.compat.sdk import TaskInstanceKey
     from airflow.sdk import Context
 
 
-class AgentOperator(BaseOperator):
+class HITLReviewLink(BaseOperatorLink):
+    """
+    Link that opens the live chat window for a running feedback session.
+
+    The URL is constructed directly from the task instance key so that the
+    link is available immediately — even while the task is still running —
+    without waiting for an XCom value to be committed.
+    """
+
+    name = "HITL Review"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        if not getattr(operator, "enable_hitl_review", False):
+            return ""
+        return (
+            f"/hitl-review/chat-by-task"
+            
f"?dag_id={ti_key.dag_id}&run_id={ti_key.run_id}&task_id={ti_key.task_id}"

Review Comment:
   nit: this URL doesn't include `map_index`. For mapped tasks, the chat UI 
will default to `map_index=-1`, which won't find the correct session.
   
   Add `&map_index={ti_key.map_index}`.



##########
providers/common/ai/src/airflow/providers/common/ai/utils/hitl_review.py:
##########
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Shared data models, exceptions, and XCom key constants for HITL Review.
+
+Used by both the API-server-side plugin (``plugins.hitl_review``) and the
+worker-side operator mixin (``mixins.hitl_review``).  Depends only on
+``pydantic`` and the standard library.
+
+**Storage**: all session state is persisted as XCom entries on the running
+task instance.  See the *XCom key constants* below for the key naming scheme.
+"""
+
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from enum import Enum
+from typing import Any
+
+from pydantic import BaseModel, Field
+
+"""
+These xcom keys are reserved for agentic operator with HITL feedback loop.
+"""
+
+_XCOM_PREFIX = "airflow_hitl_review_"
+
+XCOM_AGENT_SESSION = f"{_XCOM_PREFIX}agent_session"
+"""Session metadata written by the **worker**.
+
+Value: ``{"status": "...", "iteration": N,
+"prompt": "...", "current_output": "..."}``.
+"""
+
+XCOM_HUMAN_ACTION = f"{_XCOM_PREFIX}human_action"
+"""Human action command written by the **plugin**.
+
+Value: ``{"action": "approve"|"reject"|"changes_requested",
+"feedback": "...", "iteration": N}``.
+"""
+
+XCOM_HITL_CHAT_URL = f"{_XCOM_PREFIX}chat_url"
+"""Chat page URL, written by the worker."""
+
+XCOM_AGENT_OUTPUT_PREFIX = f"{_XCOM_PREFIX}agent_output_"
+"""Per-iteration AI output (append-only, written by worker).
+
+Actual key: ``airflow_hitl_review_agent_output_1``, ``_2``, ...
+"""
+
+XCOM_HUMAN_FEEDBACK_PREFIX = f"{_XCOM_PREFIX}human_feedback_"
+"""Per-iteration human feedback (append-only, written by plugin).
+
+Actual key: ``airflow_hitl_review_human_feedback_1``, ``_2``, ...
+"""
+
+class HITLSessionError(Exception):
+    """Raised when a session is in an unexpected state or goes missing."""
+
+
+class SessionStatus(str, Enum):
+    """Lifecycle states of a HITL review session."""
+
+    PENDING_REVIEW = "pending_review"
+    CHANGES_REQUESTED = "changes_requested"
+    APPROVED = "approved"
+    REJECTED = "rejected"
+
+
+class ConversationEntry(BaseModel):
+    """Single turn in the feedback conversation."""
+
+    role: str
+    content: str
+    iteration: int
+    timestamp: datetime = Field(default_factory=lambda: 
datetime.now(timezone.utc))
+
+
+class AgentSessionData(BaseModel):
+    """
+    Session metadata stored in the ``airflow_hitl_review_agent_session`` XCom.
+
+    Written by the **worker** only.
+    """
+
+    status: SessionStatus = SessionStatus.PENDING_REVIEW
+    iteration: int = 1
+    prompt: str = ""
+    current_output: str = ""
+
+
+class HumanActionData(BaseModel):
+    """
+    Human action payload stored in the ``airflow_hitl_review_human_action`` 
XCom.
+
+    Written by the **plugin** only.
+    """
+
+    action: str

Review Comment:
   `action` is typed as `str`, so any value is accepted. The mixin's poll loop 
checks for `"approve"`, `"reject"`, and `"changes_requested"` but silently 
ignores anything else (falls through to the next poll iteration). A typo like 
`"approved"` would cause the worker to loop forever.
   
   Use `Literal["approve", "reject", "changes_requested"]` or an Enum to 
validate at parse time.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/hitl_review.py:
##########
@@ -0,0 +1,530 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+from typing import Annotated
+
+from fastapi import Depends, FastAPI, HTTPException, Query
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.models.xcom import XComModel
+from airflow.plugins_manager import AirflowPlugin
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    XCOM_HUMAN_FEEDBACK_PREFIX,
+    AgentSessionData,
+    HITLReviewResponse,
+    HumanActionData,
+    HumanFeedbackRequest,
+    SessionStatus,
+)
+from airflow.utils.session import create_session
+
+log = logging.getLogger(__name__)
+
+_PLUGIN_PREFIX = "/hitl-review"
+
+
+def _get_session():
+    with create_session(scoped=False) as session:
+        yield session
+
+
+SessionDep = Annotated[Session, Depends(_get_session, scope="function")]
+
+
+def _get_map_index(q: str = Query("-1", alias="map_index")) -> int:
+    """Parse map_index query; use -1 when placeholder unreplaced (e.g. 
``{MAP_INDEX}``) or invalid."""
+    try:
+        return int(q)
+    except (ValueError, TypeError):
+        return -1
+
+
+MapIndexDep = Annotated[int, Depends(_get_map_index)]
+
+
+def _read_xcom(session: Session, *, dag_id: str, run_id: str, task_id: str, 
map_index: int = -1, key: str):
+    """Read a single XCom value from the database."""
+    row = session.scalars(
+        XComModel.get_many(
+            run_id=run_id,
+            key=key,
+            dag_ids=dag_id,
+            task_ids=task_id,
+            map_indexes=map_index,
+            limit=1,
+        )
+    ).first()
+    if row is None:
+        return None
+    return XComModel.deserialize_value(row)
+
+
+def _read_xcom_by_prefix(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, prefix: str
+) -> dict[int, str]:
+    """Read all iteration-keyed XCom entries matching *prefix* (e.g. 
``airflow_hitl_review_agent_output_``)."""
+    query = select(XComModel.key, XComModel.value).where(
+        XComModel.dag_id == dag_id,
+        XComModel.run_id == run_id,
+        XComModel.task_id == task_id,
+        XComModel.map_index == map_index,
+        XComModel.key.like(f"{prefix}%"),
+    )
+    result: dict[int, str] = {}
+    for key, value in session.execute(query).all():
+        suffix = key[len(prefix) :]
+        if suffix.isdigit():
+            result[int(suffix)] = value
+    return result
+
+
+def _write_xcom(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, key: str, value
+):
+    """
+    Write (upsert) a single XCom value in the database.
+
+    Uses ``serialize=False`` so the value is stored natively in the JSON column
+    without double-encoding (``XComModel.set`` with the default 
``serialize=True``
+    calls ``json.dumps`` which would wrap dicts in a JSON string).
+    """
+    XComModel.set(
+        key=key,
+        value=value,
+        dag_id=dag_id,
+        task_id=task_id,
+        run_id=run_id,
+        map_index=map_index,
+        serialize=False,
+        session=session,
+    )
+
+
+def _parse_model(model_cls, raw):
+    """Parse a Pydantic model from a value that may be a dict or a JSON 
string."""
+    if isinstance(raw, str):
+        return model_cls.model_validate_json(raw)
+    return model_cls.model_validate(raw)
+
+
+_RUNNING_TI_STATES = frozenset({"running", "deferred", "up_for_retry", 
"queued", "scheduled"})
+
+
+def _is_task_completed(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1
+) -> bool:
+    """Return True if the task instance is no longer running."""
+    state = session.scalar(
+        select(TI.state).where(
+            TI.dag_id == dag_id,
+            TI.run_id == run_id,
+            TI.task_id == task_id,
+            TI.map_index == map_index,
+        )
+    )
+    if state is None:
+        return True
+    return state not in _RUNNING_TI_STATES
+
+
+def _build_session_response(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1
+) -> HITLReviewResponse | None:
+    """Build `HITLReviewResponse` from XCom entries."""
+    raw = _read_xcom(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+    )
+    if raw is None:
+        return None
+    sess_data = _parse_model(AgentSessionData, raw)
+    outputs = _read_xcom_by_prefix(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        prefix=XCOM_AGENT_OUTPUT_PREFIX,
+    )
+    human_responses = _read_xcom_by_prefix(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        prefix=XCOM_HUMAN_FEEDBACK_PREFIX,
+    )
+    completed = _is_task_completed(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    return HITLReviewResponse.from_xcom(
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        session=sess_data,
+        outputs=outputs,
+        human_entries=human_responses,
+        task_completed=completed,
+    )
+
+
+hitl_review_app = FastAPI(
+    title="HITL Review",
+    description=(
+        "REST API and chat UI for human-in-the-loop LLM feedback sessions.  "
+        "Sessions are stored in XCom entries on the running task instance."
+    ),
+)
+
+
+@hitl_review_app.get("/health")
+async def health() -> dict[str, str]:
+    """Liveness check."""
+    return {"status": "ok"}
+
+
+@hitl_review_app.get("/sessions/find", response_model=HITLReviewResponse)
+async def find_session(
+    db: SessionDep,
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    map_index: MapIndexDep,
+) -> HITLReviewResponse:
+    """Find the feedback session for a specific task instance."""
+    resp = _build_session_response(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    if resp is None:
+        task_active = not _is_task_completed(
+            db,
+            dag_id=dag_id,
+            run_id=run_id,
+            task_id=task_id,
+            map_index=map_index,
+        )
+        raise HTTPException(
+            status_code=404,
+            detail={"message": "No matching session found.", "task_active": 
task_active},
+        )
+    return resp
+
+
+@hitl_review_app.post("/sessions/feedback", response_model=HITLReviewResponse)
+async def submit_feedback(
+    body: HumanFeedbackRequest,
+    db: SessionDep,
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    map_index: MapIndexDep,
+) -> HITLReviewResponse:
+    """Request changes — provide human feedback for the LLM."""
+    raw = _read_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+    )
+    if raw is None:
+        raise HTTPException(status_code=404, detail="No matching session 
found.")
+    sess_data = _parse_model(AgentSessionData, raw)
+    if sess_data.status != SessionStatus.PENDING_REVIEW:
+        raise HTTPException(
+            status_code=409,
+            detail=f"Session is '{sess_data.status.value}', expected 
'pending_review'.",
+        )
+
+    iteration = sess_data.iteration
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=f"{XCOM_HUMAN_FEEDBACK_PREFIX}{iteration}",
+        value=body.feedback,
+    )
+
+    action = HumanActionData(action="changes_requested", 
feedback=body.feedback, iteration=iteration)
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_HUMAN_ACTION,
+        value=action.model_dump(mode="json"),
+    )
+
+    sess_data.status = SessionStatus.CHANGES_REQUESTED
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+        value=sess_data.model_dump(mode="json"),
+    )
+
+    resp = _build_session_response(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    if resp is None:
+        raise HTTPException(status_code=500, detail="Failed to read session 
after update.")
+    return resp
+
+
+@hitl_review_app.post("/sessions/approve", response_model=HITLReviewResponse)
+async def approve_session(
+    db: SessionDep,
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    map_index: MapIndexDep,
+) -> HITLReviewResponse:
+    """Approve the current output."""
+    raw = _read_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+    )
+    if raw is None:
+        raise HTTPException(status_code=404, detail="No matching session 
found.")
+
+    sess_data = _parse_model(AgentSessionData, raw)
+    if sess_data.status != SessionStatus.PENDING_REVIEW:
+        raise HTTPException(
+            status_code=409,
+            detail=f"Session is '{sess_data.status.value}', expected 
'pending_review'.",
+        )
+
+    action = HumanActionData(action="approve", iteration=sess_data.iteration)
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_HUMAN_ACTION,
+        value=action.model_dump(mode="json"),
+    )
+
+    sess_data.status = SessionStatus.APPROVED
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+        value=sess_data.model_dump(mode="json"),
+    )
+
+    resp = _build_session_response(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    if resp is None:
+        raise HTTPException(status_code=500, detail="Failed to read session 
after update.")
+    return resp
+
+
+@hitl_review_app.post("/sessions/reject", response_model=HITLReviewResponse)
+async def reject_session(
+    db: SessionDep,
+    dag_id: str,
+    task_id: str,
+    run_id: str,
+    map_index: MapIndexDep,
+) -> HITLReviewResponse:
+    """Reject the output."""
+    raw = _read_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+    )
+    if raw is None:
+        raise HTTPException(status_code=404, detail="No matching session 
found.")
+    sess_data = _parse_model(AgentSessionData, raw)
+    if sess_data.status != SessionStatus.PENDING_REVIEW:
+        raise HTTPException(
+            status_code=409,
+            detail=f"Session is '{sess_data.status.value}', expected 
'pending_review'.",
+        )
+
+    action = HumanActionData(action="reject", iteration=sess_data.iteration)
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_HUMAN_ACTION,
+        value=action.model_dump(mode="json"),
+    )
+
+    sess_data.status = SessionStatus.REJECTED
+    _write_xcom(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+        value=sess_data.model_dump(mode="json"),
+    )
+
+    resp = _build_session_response(
+        db,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    if resp is None:
+        raise HTTPException(status_code=500, detail="Failed to read session 
after update.")
+    return resp
+
+
+# -- Chat UI ----------------------------------------------------------------
+
+_CHAT_HTML_SHELL = """\
+<!DOCTYPE html>
+<html lang="en">
+<head>
+<meta charset="UTF-8">
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
+<title>HITL Review</title>
+<style>*{margin:0;padding:0;box-sizing:border-box}</style>
+</head>
+<body>
+<div id="root"></div>
+<script src="__STATIC_PREFIX__/main.js"></script>
+</body>
+</html>
+"""
+
+
+@hitl_review_app.get("/chat", response_class=HTMLResponse)
+async def chat_page(
+    db: SessionDep,
+    dag_id: str,
+    run_id: str,
+    task_id: str,
+    map_index: int = -1,
+) -> HTMLResponse:
+    """Serve the interactive chat window for a feedback session."""
+    run_id = run_id.replace(" ", "+")

Review Comment:
   This `run_id.replace(" ", "+")` is only applied on the `/chat` endpoint. The 
`/chat-by-task`, `/sessions/find`, `/sessions/feedback`, `/sessions/approve`, 
and `/sessions/reject` endpoints don't do it. If `run_id` needs space-to-plus 
normalization (URL form encoding converts `+` to space), it should happen 
consistently across all endpoints or none.
   
   FastAPI handles URL decoding already, so this might be papering over a 
different bug. Either apply it everywhere (shared dependency/middleware) or 
remove it.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/hitl_review.py:
##########
@@ -0,0 +1,530 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+from typing import Annotated
+
+from fastapi import Depends, FastAPI, HTTPException, Query
+from fastapi.responses import HTMLResponse
+from fastapi.staticfiles import StaticFiles
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.models.xcom import XComModel
+from airflow.plugins_manager import AirflowPlugin
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    XCOM_HUMAN_FEEDBACK_PREFIX,
+    AgentSessionData,
+    HITLReviewResponse,
+    HumanActionData,
+    HumanFeedbackRequest,
+    SessionStatus,
+)
+from airflow.utils.session import create_session
+
+log = logging.getLogger(__name__)
+
+_PLUGIN_PREFIX = "/hitl-review"
+
+
+def _get_session():
+    with create_session(scoped=False) as session:
+        yield session
+
+
+SessionDep = Annotated[Session, Depends(_get_session, scope="function")]
+
+
+def _get_map_index(q: str = Query("-1", alias="map_index")) -> int:
+    """Parse map_index query; use -1 when placeholder unreplaced (e.g. 
``{MAP_INDEX}``) or invalid."""
+    try:
+        return int(q)
+    except (ValueError, TypeError):
+        return -1
+
+
+MapIndexDep = Annotated[int, Depends(_get_map_index)]
+
+
+def _read_xcom(session: Session, *, dag_id: str, run_id: str, task_id: str, 
map_index: int = -1, key: str):
+    """Read a single XCom value from the database."""
+    row = session.scalars(
+        XComModel.get_many(
+            run_id=run_id,
+            key=key,
+            dag_ids=dag_id,
+            task_ids=task_id,
+            map_indexes=map_index,
+            limit=1,
+        )
+    ).first()
+    if row is None:
+        return None
+    return XComModel.deserialize_value(row)
+
+
+def _read_xcom_by_prefix(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, prefix: str
+) -> dict[int, str]:
+    """Read all iteration-keyed XCom entries matching *prefix* (e.g. 
``airflow_hitl_review_agent_output_``)."""
+    query = select(XComModel.key, XComModel.value).where(
+        XComModel.dag_id == dag_id,
+        XComModel.run_id == run_id,
+        XComModel.task_id == task_id,
+        XComModel.map_index == map_index,
+        XComModel.key.like(f"{prefix}%"),
+    )
+    result: dict[int, str] = {}
+    for key, value in session.execute(query).all():
+        suffix = key[len(prefix) :]
+        if suffix.isdigit():
+            result[int(suffix)] = value
+    return result
+
+
+def _write_xcom(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1, key: str, value
+):
+    """
+    Write (upsert) a single XCom value in the database.
+
+    Uses ``serialize=False`` so the value is stored natively in the JSON column
+    without double-encoding (``XComModel.set`` with the default 
``serialize=True``
+    calls ``json.dumps`` which would wrap dicts in a JSON string).
+    """
+    XComModel.set(
+        key=key,
+        value=value,
+        dag_id=dag_id,
+        task_id=task_id,
+        run_id=run_id,
+        map_index=map_index,
+        serialize=False,
+        session=session,
+    )
+
+
+def _parse_model(model_cls, raw):
+    """Parse a Pydantic model from a value that may be a dict or a JSON 
string."""
+    if isinstance(raw, str):
+        return model_cls.model_validate_json(raw)
+    return model_cls.model_validate(raw)
+
+
+_RUNNING_TI_STATES = frozenset({"running", "deferred", "up_for_retry", 
"queued", "scheduled"})
+
+
+def _is_task_completed(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1
+) -> bool:
+    """Return True if the task instance is no longer running."""
+    state = session.scalar(
+        select(TI.state).where(
+            TI.dag_id == dag_id,
+            TI.run_id == run_id,
+            TI.task_id == task_id,
+            TI.map_index == map_index,
+        )
+    )
+    if state is None:
+        return True
+    return state not in _RUNNING_TI_STATES
+
+
+def _build_session_response(
+    session: Session, *, dag_id: str, run_id: str, task_id: str, map_index: 
int = -1
+) -> HITLReviewResponse | None:
+    """Build `HITLReviewResponse` from XCom entries."""
+    raw = _read_xcom(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        key=XCOM_AGENT_SESSION,
+    )
+    if raw is None:
+        return None
+    sess_data = _parse_model(AgentSessionData, raw)
+    outputs = _read_xcom_by_prefix(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        prefix=XCOM_AGENT_OUTPUT_PREFIX,
+    )
+    human_responses = _read_xcom_by_prefix(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+        prefix=XCOM_HUMAN_FEEDBACK_PREFIX,
+    )
+    completed = _is_task_completed(
+        session,
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        map_index=map_index,
+    )
+    return HITLReviewResponse.from_xcom(
+        dag_id=dag_id,
+        run_id=run_id,
+        task_id=task_id,
+        session=sess_data,
+        outputs=outputs,
+        human_entries=human_responses,
+        task_completed=completed,
+    )
+
+
+hitl_review_app = FastAPI(
+    title="HITL Review",

Review Comment:
   These endpoints have no authentication. In Airflow 3.x, plugin FastAPI 
sub-apps are mounted via `app.mount()` in `init_plugins()` 
(`api_fastapi/app.py:208`), which does NOT propagate route-level auth 
dependencies (`Depends(get_user)`, `requires_access_*`) to the sub-app. The 
`JWTRefreshMiddleware` on the parent app runs for plugin requests but only 
refreshes tokens — it doesn't reject unauthenticated requests.
   
   The edge3 provider handles this correctly by adding 
`Depends(requires_access_view(access_view=AccessView.JOBS))` on every UI-facing 
route (see `edge3/worker_api/routes/ui.py`). This plugin needs the same 
treatment — either add per-route auth dependencies or use a router-level 
dependency.
   
   As-is, anyone with network access to the webserver can approve, reject, or 
submit feedback on any task's output.



##########
providers/common/ai/src/airflow/providers/common/ai/plugins/www/src/components/ChatPage.tsx:
##########
@@ -0,0 +1,290 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import {
+  type FC,
+  type KeyboardEvent,
+  useCallback,
+  useEffect,
+  useRef,
+  useState,
+} from "react";
+
+import { MessageBubble } from "src/components/MessageBubble";
+import { NoSession } from "src/components/NoSession";
+import { useSession } from "src/hooks/useSession";
+
+import styles from "./ChatPage.module.css";
+
+interface ChatPageProps {
+  dagId: string;
+  runId: string;
+  taskId: string;
+  mapIndex: number;
+}
+
+type ConfirmAction = "approve" | "reject" | null;
+
+const STATUS_BADGE: Record<string, { cls: string; label: string }> = {
+  pending_review: { cls: styles.badgePending!, label: "Pending Review" },
+  approved: { cls: styles.badgeApproved!, label: "Approved" },
+  rejected: { cls: styles.badgeRejected!, label: "Rejected" },
+  changes_requested: { cls: styles.badgeChanges!, label: "Regenerating..." },
+};
+
+export const ChatPage: FC<ChatPageProps> = ({ dagId, runId, taskId, mapIndex 
}) => {
+  const { session, error, loading, taskActive, sendFeedback, approve, reject } 
=
+    useSession(dagId, runId, taskId, mapIndex);
+
+  const [feedbackText, setFeedbackText] = useState("");
+  const [confirmAction, setConfirmAction] = useState<ConfirmAction>(null);
+  const [toast, setToast] = useState<{ msg: string; ok: boolean } | 
null>(null);
+  const chatRef = useRef<HTMLDivElement>(null);
+  const textareaRef = useRef<HTMLTextAreaElement>(null);
+
+  useEffect(() => {
+    if (chatRef.current) {
+      chatRef.current.scrollTop = chatRef.current.scrollHeight;
+    }
+  }, [session?.conversation]);
+
+  useEffect(() => {
+    if (toast) {
+      const t = setTimeout(() => setToast(null), 3000);
+      return () => clearTimeout(t);
+    }
+  }, [toast]);
+
+  const autoResize = useCallback(() => {
+    const ta = textareaRef.current;
+    if (ta) {
+      ta.style.height = "auto";
+      ta.style.height = `${ta.scrollHeight}px`;
+    }
+  }, []);
+
+  const handleSend = useCallback(async () => {
+    const text = feedbackText.trim();
+    if (!text) return;
+    try {
+      await sendFeedback(text);
+      setFeedbackText("");
+      setToast({ msg: "Feedback sent", ok: true });
+    } catch (err) {
+      setToast({ msg: err instanceof Error ? err.message : "Error", ok: false 
});
+    }
+  }, [feedbackText, sendFeedback]);
+
+  const handleKeyDown = useCallback(
+    (e: KeyboardEvent) => {
+      if (e.ctrlKey && e.key === "Enter") {
+        void handleSend();
+      }
+    },
+    [handleSend],
+  );
+
+  const execConfirm = useCallback(async () => {
+    const action = confirmAction;
+    setConfirmAction(null);
+    try {
+      if (action === "approve") {
+        await approve();
+        setToast({ msg: "Approved", ok: true });
+      } else if (action === "reject") {
+        await reject();
+        setToast({ msg: "Rejected", ok: true });
+      }
+    } catch (err) {
+      setToast({ msg: err instanceof Error ? err.message : "Error", ok: false 
});
+    }
+  }, [confirmAction, approve, reject]);
+
+  if (loading) {
+    return (
+      <div className={styles.placeholder}>
+        <div className={styles.placeholderCard}>
+          <div className={styles.spinner} />
+          <h2 className={styles.placeholderHeading}>Connecting to session</h2>
+          <p className={styles.placeholderDesc}>
+            Looking up the HITL review session for this task...
+          </p>
+        </div>
+      </div>
+    );
+  }
+
+  if (!session) {
+    if (taskActive === false) {
+      return <NoSession />;
+    }
+
+    return (
+      <div className={styles.placeholder}>
+        <div className={styles.placeholderCard}>
+          <div className={styles.placeholderIcon}>&#x1F4AC;</div>
+          <h2 className={styles.placeholderHeading}>Waiting for session to 
start</h2>
+          <p className={styles.placeholderDesc}>
+            The HITL review session has not been created yet. This usually 
means the
+            task is still starting up. This page will automatically connect 
once the
+            session is available.
+          </p>
+          <div className={styles.placeholderHint}>
+            <span className={styles.dot} />
+            <span className={styles.dot} />
+            <span className={styles.dot} />
+          </div>
+          {error && <p className={styles.placeholderMuted}>{error}</p>}
+        </div>
+      </div>
+    );
+  }
+
+  const isTerminal =
+    session.status === "approved" ||
+    session.status === "rejected" ||
+    session.task_completed;
+  const canAct = session.status === "pending_review" && 
!session.task_completed;
+  const badge = STATUS_BADGE[session.status] ?? 
STATUS_BADGE["pending_review"]!;
+
+  return (
+    <div className={styles.app}>
+      <header className={styles.header}>
+        <h1 className={styles.title}>HITL Review</h1>
+        <div className={styles.meta}>
+          <span><b>Task:</b> {session.task_id}</span>
+          <span><b>DAG:</b> {session.dag_id}</span>
+          <span>
+            <b>Iteration:</b> {session.iteration}/{session.max_iterations}

Review Comment:
   `session.max_iterations` doesn't exist on `SessionResponse` (see 
`feedback.ts`) or `HITLReviewResponse` (Python side). This will render as 
`1/undefined`.
   
   Either add `max_iterations` to both the Python response model and TS type, 
or drop `/{session.max_iterations}` from this line.



##########
providers/common/ai/src/airflow/providers/common/ai/mixins/hitl_review.py:
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, Protocol
+
+from pydantic import BaseModel
+
+from airflow.providers.common.ai.utils.hitl_review import (
+    XCOM_AGENT_OUTPUT_PREFIX,
+    XCOM_AGENT_SESSION,
+    XCOM_HUMAN_ACTION,
+    AgentSessionData,
+    HumanActionData,
+    SessionStatus,
+)
+
+log = logging.getLogger(__name__)
+
+if TYPE_CHECKING:
+    from airflow.sdk import Context
+
+
+class HITLReviewProtocol(Protocol):
+    """Attributes that the host operator must provide."""
+
+    enable_hitl_review: bool
+    hitl_timeout: timedelta | None
+    hitl_poll_interval: float
+    prompt: str
+    task_id: str
+    log: Any
+
+
+class HITLReviewMixin:
+    """
+    Mixin that drives an iterative HITL review loop inside ``execute()``.
+
+    After the operator generates its first output, the mixin:
+
+    1. Pushes session metadata and the first agent output to XCom.
+    2. Polls the human action XCom (``airflow_hitl_review_human_action``) at 
``hitl_poll_interval`` seconds.
+    3. When a human sets action to ``changes_requested`` (via the plugin API),
+       calls :meth:`regenerate_with_feedback` and pushes the new agent output.
+    4. When a human sets action to ``approved``, returns the output.
+    5. When a human sets action to ``rejected``, raises a `HITLRejectException`
+
+    The loop stops after ``hitl_timeout``.
+
+    All agent outputs and human feedback are persisted as iteration-keyed
+    XCom entries (``airflow_hitl_review_agent_output_1``, 
``airflow_hitl_review_human_feedback_1``, etc.) for full
+    auditability.
+
+    Operators using this mixin must set:
+
+    - ``enable_hitl_review`` (``bool``)
+    - ``hitl_timeout`` (``timedelta | None``)
+    - ``hitl_poll_interval`` (``float``, seconds)
+    - ``prompt`` (``str``)
+
+    And must implement: meth:`regenerate_with_feedback`.
+    """
+
+    def run_hitl_review(
+        self: HITLReviewProtocol,
+        context: Context,
+        output: Any,
+        *,
+        message_history: Any = None,
+    ) -> str:
+        """
+        Execute the full HITL review loop.
+
+        :param context: Airflow task context.
+        :param output: Initial LLM output (str or BaseModel).
+        :param message_history: Provider-specific conversation state (e.g.
+            pydantic-ai ``list[ModelMessage]``).  Passed to
+            :meth:`regenerate_with_feedback` on each iteration.
+        :returns: The final approved (or max-iteration) output as a string.
+        """
+        output_str = self._to_string(output)
+        ti = context["task_instance"]
+
+        session = AgentSessionData(
+            status=SessionStatus.PENDING_REVIEW,
+            iteration=1,
+            prompt=self.prompt,
+            current_output=output_str,
+        )
+
+        ti.xcom_push(key=XCOM_AGENT_SESSION, 
value=session.model_dump(mode="json"))
+        ti.xcom_push(key=f"{XCOM_AGENT_OUTPUT_PREFIX}1", value=output_str)
+
+        self.log.info(
+            "Feedback session created for %s/%s/%s (poll every %ds).",
+            ti.dag_id,
+            ti.run_id,
+            ti.task_id,
+            self.hitl_poll_interval,
+        )
+
+        deadline = time.monotonic() + self.hitl_timeout.total_seconds() if 
self.hitl_timeout else None
+
+        return self._poll_loop(
+            ti=ti,
+            session=session,
+            message_history=message_history,
+            deadline=deadline,
+        )
+
+    def _poll_loop(
+        self: HITLReviewProtocol,
+        *,
+        ti: Any,
+        session: AgentSessionData,
+        message_history: Any,
+        deadline: float | None,
+    ) -> str:
+        """
+        Block until the session reaches a terminal state.
+
+        This loops until the human actions approved or rejected or the timeout 
expires.
+        """
+        from airflow.providers.standard.exceptions import (
+            HITLRejectException,
+            HITLTimeoutError,
+        )
+        last_seen_iteration = 0
+
+        while True:
+            if deadline is not None and time.monotonic() > deadline:
+                raise HITLTimeoutError(f"No response within 
{self.hitl_timeout}.")
+
+            time.sleep(self.hitl_poll_interval)
+            action_raw = ti.xcom_pull(key=XCOM_HUMAN_ACTION, 
task_ids=ti.task_id, map_indexes=ti.map_index)

Review Comment:
   `time.sleep` blocks the worker process/thread. With a 10-second poll 
interval and potentially long review times (30+ minutes), the worker slot is 
completely occupied doing nothing.
   
   I know we discussed on Slack why XCom polling was chosen over the 
deferral/Triggerer pattern (which the standard provider's `HITLOperator` 
already uses). Worth including that rationale in the PR description so 
reviewers have context on the tradeoff.
   
   Also worth adding a prominent note in the docs that each HITL task holds a 
worker slot for the entire review duration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to