kaxil opened a new pull request, #68648:
URL: https://github.com/apache/airflow/pull/68648

   `AgentOperator` (and the `@task.agent` decorator) ran a fresh, single-turn
   conversation on every run. There was no supported way to seed a run with 
prior
   turns or to persist the resulting transcript for the next run, so 
conversational
   or iterative agents that resume a conversation across DAG runs could not be
   expressed with the operator.
   
   This adds an opt-in `message_history` parameter. When set, the operator 
seeds the
   run with the prior turns and pushes the full post-run transcript to XCom (key
   `message_history`) so the next run can resume. `message_history=None` (the
   default) keeps the existing single-turn behavior unchanged, so there is no 
impact
   on current users.
   
   ## How it works
   
   - `message_history` accepts a list of pydantic-ai `ModelMessage` objects or 
their
     JSON form (`str` / `bytes`), and is a templated field. It is deserialized 
via
     `ModelMessagesTypeAdapter` and passed to `run_sync(message_history=...)` 
on both
     the durable and non-durable branches.
   - After the run, the transcript (`result.all_messages()`) is serialized and
     pushed to XCom under the key `message_history`.
   - An empty `[]` / `""` starts a fresh session, so a templated
     `{{ ti.xcom_pull(task_ids='ask', key='message_history', default='[]') }}` 
works
     on the first run (no XCom yet) instead of failing to parse the string 
`"None"`.
   
   ## Usage
   
   A multi-turn session brackets the agent with a load and a save task. Where 
the
   transcript is stored (keyed by a session id, e.g. in object storage) is left 
to
   the DAG:
   
   
https://github.com/astronomer/airflow/blob/3378a9c6c3c6e6fe4002a6ad8ccc5b3a08c2bd52/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py#L224-L268
   
   <img width="1440" height="1000" alt="image" 
src="https://github.com/user-attachments/assets/46382ff6-171a-43ae-a628-e6f66e91cfd4";
 />
   <img width="1440" height="1000" alt="image" 
src="https://github.com/user-attachments/assets/138738a9-d99b-45a6-85d9-9af6603fed52";
 />
   <img width="1706" height="827" alt="image" 
src="https://github.com/user-attachments/assets/c34a610c-913f-418b-8c83-0ecaa140bf80";
 />
   
   <img width="1711" height="850" alt="image" 
src="https://github.com/user-attachments/assets/14323d19-86c9-4fae-a7b5-b3c74e1dda93";
 />
   
   
   ## Design notes
   
   - **No storage abstraction, by design.** The operator owns the round-trip
     (history in, transcript out); it does not own where a session is stored.
     Session keying is deployment-specific and belongs in the DAG, and keeping 
the
     surface this thin avoids committing the provider to a message-persistence
     protocol while the upstream ones are still settling.
   - **`message_history` cannot be combined with `enable_hitl_review`** (the 
operator
     raises at construction, mirroring the existing `durable` + HITL guard). The
     post-review transcript is not recoverable today (`run_hitl_review` returns 
only
     the final string), so emitting the pre-review transcript would silently 
drop the
     human-approved turns. This can be lifted once HITL surfaces the final 
history.
   
   ## Gotchas
   
   - The transcript is cumulative and grows every turn. For long-running 
sessions,
     configure an object-storage XCom backend or trim older turns before 
feeding the
     history back, rather than passing the whole history unbounded.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to