kaxil opened a new pull request, #68648:
URL: https://github.com/apache/airflow/pull/68648
`AgentOperator` (and the `@task.agent` decorator) ran a fresh, single-turn
conversation on every run. There was no supported way to seed a run with
prior
turns or to persist the resulting transcript for the next run, so
conversational
or iterative agents that resume a conversation across DAG runs could not be
expressed with the operator.
This adds an opt-in `message_history` parameter. When set, the operator
seeds the
run with the prior turns and pushes the full post-run transcript to XCom (key
`message_history`) so the next run can resume. `message_history=None` (the
default) keeps the existing single-turn behavior unchanged, so there is no
impact
on current users.
## How it works
- `message_history` accepts a list of pydantic-ai `ModelMessage` objects or
their
JSON form (`str` / `bytes`), and is a templated field. It is deserialized
via
`ModelMessagesTypeAdapter` and passed to `run_sync(message_history=...)`
on both
the durable and non-durable branches.
- After the run, the transcript (`result.all_messages()`) is serialized and
pushed to XCom under the key `message_history`.
- An empty `[]` / `""` starts a fresh session, so a templated
`{{ ti.xcom_pull(task_ids='ask', key='message_history', default='[]') }}`
works
on the first run (no XCom yet) instead of failing to parse the string
`"None"`.
## Usage
A multi-turn session brackets the agent with a load and a save task. Where
the
transcript is stored (keyed by a session id, e.g. in object storage) is left
to
the DAG:
https://github.com/astronomer/airflow/blob/3378a9c6c3c6e6fe4002a6ad8ccc5b3a08c2bd52/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py#L224-L268
<img width="1440" height="1000" alt="image"
src="https://github.com/user-attachments/assets/46382ff6-171a-43ae-a628-e6f66e91cfd4"
/>
<img width="1440" height="1000" alt="image"
src="https://github.com/user-attachments/assets/138738a9-d99b-45a6-85d9-9af6603fed52"
/>
<img width="1706" height="827" alt="image"
src="https://github.com/user-attachments/assets/c34a610c-913f-418b-8c83-0ecaa140bf80"
/>
<img width="1711" height="850" alt="image"
src="https://github.com/user-attachments/assets/14323d19-86c9-4fae-a7b5-b3c74e1dda93"
/>
## Design notes
- **No storage abstraction, by design.** The operator owns the round-trip
(history in, transcript out); it does not own where a session is stored.
Session keying is deployment-specific and belongs in the DAG, and keeping
the
surface this thin avoids committing the provider to a message-persistence
protocol while the upstream ones are still settling.
- **`message_history` cannot be combined with `enable_hitl_review`** (the
operator
raises at construction, mirroring the existing `durable` + HITL guard). The
post-review transcript is not recoverable today (`run_hitl_review` returns
only
the final string), so emitting the pre-review transcript would silently
drop the
human-approved turns. This can be lifted once HITL surfaces the final
history.
## Gotchas
- The transcript is cumulative and grows every turn. For long-running
sessions,
configure an object-storage XCom backend or trim older turns before
feeding the
history back, rather than passing the whole history unbounded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]