This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 3b5b84c [docs] Update workflow agent development doc. (#233)
3b5b84c is described below
commit 3b5b84cfbae95fe51b8406971c30187128eb58e3
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Sep 29 21:05:40 2025 +0800
[docs] Update workflow agent development doc. (#233)
---
docs/content/docs/development/workflow_agent.md | 200 +++++++++++++++++++-----
1 file changed, 159 insertions(+), 41 deletions(-)
diff --git a/docs/content/docs/development/workflow_agent.md
b/docs/content/docs/development/workflow_agent.md
index 873b5ea..254588a 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -26,7 +26,7 @@ under the License.
A workflow style agent in Flink-Agents is an agent whose reasoning and
behavior are organized as a directed workflow of modular steps, called actions,
connected by events. This design is inspired by the need to orchestrate
complex, multi-stage tasks in a transparent, extensible, and data-centric way,
leveraging Apache Flink's streaming architecture.
-In Flink-Agents, a workflow agent is defined as a class that inherits from the
`Agent` base class. The agent's logic is expressed as a set of actions, each of
which is a function decorated with `@action(EventType)`. Actions consume
events, perform reasoning or tool calls, and emit new events, which may trigger
downstream actions. This event-driven workflow forms a directed acyclic graph
(DAG) of computation, where each node is an action and each edge is an event
type.
+In Flink-Agents, a workflow agent is defined as a class that inherits from the
`Agent` base class. The agent's logic is expressed as a set of actions, each of
which is a function decorated with `@action(EventType)`. Actions consume
events, perform reasoning or tool calls, and emit new events, which may trigger
downstream actions. This event-driven workflow forms a directed cyclic graph of
computation, where each node is an action and each edge is an event type.
A workflow agent is well-suited for scenarios where the solution requires
explicit orchestration, branching, or multi-step reasoning, such as data
enrichment, multi-tool pipelines, or complex business logic.
@@ -46,46 +46,44 @@ class ReviewAnalysisAgent(Agent):
@staticmethod
def review_analysis_prompt() -> Prompt:
"""Prompt for review analysis."""
- prompt_str = """
- Analyze the user review and product information to determine a
- satisfaction score (1-5) and potential reasons for dissatisfaction.
-
- Example input format:
- {{
- "id": "12345",
- "review": "The headphones broke after one week of use. Very poor
quality."
- }}
-
- Ensure your response can be parsed by Python JSON, using this format as an
example:
- {{
- "score": 1,
- "reasons": [
- "poor quality"
- ]
- }}
-
- input:
- {input}
- """
- return Prompt.from_text("review_analysis_prompt", prompt_str)
+ return review_analysis_prompt
+
+ @tool
+ @staticmethod
+ def notify_shipping_manager(id: str, review: str) -> None:
+ """Notify the shipping manager when product received a negative review
due to
+ shipping damage.
+
+ Parameters
+ ----------
+ id : str
+ The id of the product that received a negative review due to
shipping damage
+ review: str
+ The negative review content
+ """
+ # reuse the declared function, but for parsing the tool metadata, we
write doc
+ # string here again.
+ notify_shipping_manager(id=id, review=review)
@chat_model_setup
@staticmethod
- def review_analysis_model() -> Tuple[Type[BaseChatModelSetup], Dict[str,
Any]]:
+ def review_analysis_model() -> ResourceDescriptor:
"""ChatModel which focus on review analysis."""
- return OllamaChatModelSetup, {
- "name": "review_analysis_model",
- "connection": "ollama_server",
- "prompt": "review_analysis_prompt",
- "extract_reasoning": True,
- }
+ return ResourceDescriptor(
+ clazz=OllamaChatModelSetup,
+ connection="ollama_server",
+ model="qwen3:8b",
+ prompt="review_analysis_prompt",
+ tools=["notify_shipping_manager"],
+ extract_reasoning=True,
+ )
@action(InputEvent)
@staticmethod
def process_input(event: InputEvent, ctx: RunnerContext) -> None:
"""Process input event and send chat request for review analysis."""
input: ProductReview = event.input
- ctx.get_short_term_memory().set("id", input.id)
+ ctx.short_term_memory.set("id", input.id)
content = f"""
"id": {input.id},
@@ -103,7 +101,7 @@ class ReviewAnalysisAgent(Agent):
ctx.send_event(
OutputEvent(
output=ProductReviewAnalysisRes(
- id=ctx.get_short_term_memory().get("id"),
+ id=ctx.short_term_memory.get("id"),
score=json_content["score"],
reasons=json_content["reasons"],
)
@@ -119,20 +117,140 @@ class ReviewAnalysisAgent(Agent):
## Action
-{{< hint warning >}}
-**TODO**: How to define an action.
-{{< /hint >}}
+An action is a piece of code that can be executed. Each action listens to at
least one type of event. When an event of the listening type occurs, the action
will be triggered. An action can also generate new events, to trigger other
actions.
+
+
+To declare an action in Agent, user can use `@action` to decorate a function
of Agent class, and declare the listened event types as decorator parameters.
+
+The decorated function signature should be `(Event, RunnerContext) -> None`
+```python
+class ReviewAnalysisAgent(Agent):
+ @action(InputEvent)
+ @staticmethod
+ def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+ # the action logic
+```
+In the function, user can also send new events, to trigger other actions, or
output the data.
+```python
+@action(InputEvent)
+@staticmethod
+def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+ # send ChatRequestEvent
+ ctx.send_event(ChatRequestEvent(model=xxx, messages=xxx))
+ # output data to downstream
+ ctx.send_event(OutputEvent(output=xxx))
+```
## Event
+Events are messages passed between actions. Events may carry payloads. A
single event may trigger multiple actions if they are all listening to its type.
+
+There are 2 special types of event.
+* `InputEvent`: Generated by the framework, carrying an input data record that
arrives at the agent in `input` field . Actions listening to the `InputEvent`
will be the entry points of agent.
+* `OutputEvent`: The framework will listen to `OutputEvent`, and convert its
payload in `output` field into outputs of the agent. By generating
`OutputEvent`, actions can emit output data.
+
+User can define owner event by extends `Event`.
+```python
+class MyEvent(Event):
+ value: Any
+```
+Then, user can define actions listen to or send `MyEvent`.
-{{< hint warning >}}
-**TODO**: How to define, send and handle an event, including InputEvent,
OutputEvent, and event between actions.
+> **Note**: The payload of `Event` should be `BaseModel` serializable.
-{{< /hint >}}
+## Built-in Events and Actions
+
+There are several built-in `Event` and `Action` in Flink-Agents:
+* See [chat with llm]({{< ref "docs/development/chat_with_llm" >}}) for how to
chat with a LLM leveraging built-in action and events.
+* See [tool use]({{< ref "docs/development/tool_use" >}}) for how to
programmatically use a tool leveraging built-in action and events.
## Memory
-{{< hint warning >}}
-**TODO**: How to use short-term memory and long-term memory.
-{{< /hint >}}
+Memory is data that will be remembered across actions and agent runs.
+
+### Short-Term Memory
+
+Short-Term Memory is shared across all actions within an agent run, and
multiple agent runs with the same input key.
+
+Here an *agent run* refers to a complete execution of an agent. Each record
from upstream will trigger a new run of agent.
+
+This corresponds to Flink's Keyed State, which is visible to processing of
multiple records with in the same keyed partition, and is transparent to
processing of data in other keyed partitions.
+
+#### Basic Usage
+
+User can set and get short-term memory in actions.
+
+```python
+@action(InputEvent)
+@staticmethod
+def first_action(event: InputEvent, ctx: RunnerContext) -> None:
+ ...
+ ctx.short_term_memory.set("id", input.id)
+ ...
+
+@action(ChatResponseEvent)
+@staticmethod
+def second_action(event: ChatResponseEvent, ctx: RunnerContext) -> None:
+ ...
+ id=ctx.short_term_memory.get("id"),
+ ...
+```
+
+#### Store Nested Object
+
+It's not just a k-v map. User can store nested objects.
+
+```python
+@action(InputEvent)
+@staticmethod
+def first_action(event: InputEvent, ctx: RunnerContext) -> None:
+ ...
+ stm = ctx.short_term_memory
+
+ # create nested memory object, and thenset the leaf value
+ nested_obj = ctx.new_object("a")
+ nested_obj.set("b", input.id)
+
+ # directly set leaf value, will auto crate the nested object
+ ctx.set("x.y", input.user)
+ ...
+
+@action(ChatResponseEvent)
+@staticmethod
+def second_action(event: InputEvent, ctx: RunnerContext) -> None:
+ ...
+ stm = ctx.short_term_memory
+
+ # directly get leaf value, will auto parse the nested object
+ id = stm.get("a.b")
+
+ # get the nested object, and then get the leaf value
+ nested_obj = stm.get("x")
+ user = nested_obj.get("y")
+ ...
+```
+#### Memory Reference
+
+The `set` method of short term memory will return a `MemoryRef`, which can be
treated as a reference to the stored object.
+
+If user want to pass the stored object between actions, they can pass the
reference instead, which can reduce the payload size of Event.
+
+```python
+@staticmethod
+def first_action(event: Event, ctx: RunnerContext): # noqa D102
+ ...
+ stm = ctx.get_short_term_memory()
+
+ data_ref = stm.set(data_path, data_to_store)
+ ctx.send_event(MyEvent(value=data_ref))
+ ...
+@action(MyEvent)
+@staticmethod
+def second_action(event: Event, ctx: RunnerContext): # noqa D102
+ ...
+ stm = ctx.get_short_term_memory()
+
+ content_ref: MemoryRef = event.value
+ processed_data: ProcessedData = stm.get(content_ref)
+ ...
+```
\ No newline at end of file