This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b5b84c  [docs] Update workflow agent development doc. (#233)
3b5b84c is described below

commit 3b5b84cfbae95fe51b8406971c30187128eb58e3
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Sep 29 21:05:40 2025 +0800

    [docs] Update workflow agent development doc. (#233)
---
 docs/content/docs/development/workflow_agent.md | 200 +++++++++++++++++++-----
 1 file changed, 159 insertions(+), 41 deletions(-)

diff --git a/docs/content/docs/development/workflow_agent.md 
b/docs/content/docs/development/workflow_agent.md
index 873b5ea..254588a 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -26,7 +26,7 @@ under the License.
 
 A workflow style agent in Flink-Agents is an agent whose reasoning and 
behavior are organized as a directed workflow of modular steps, called actions, 
connected by events. This design is inspired by the need to orchestrate 
complex, multi-stage tasks in a transparent, extensible, and data-centric way, 
leveraging Apache Flink's streaming architecture.
 
-In Flink-Agents, a workflow agent is defined as a class that inherits from the 
`Agent` base class. The agent's logic is expressed as a set of actions, each of 
which is a function decorated with `@action(EventType)`. Actions consume 
events, perform reasoning or tool calls, and emit new events, which may trigger 
downstream actions. This event-driven workflow forms a directed acyclic graph 
(DAG) of computation, where each node is an action and each edge is an event 
type.
+In Flink-Agents, a workflow agent is defined as a class that inherits from the 
`Agent` base class. The agent's logic is expressed as a set of actions, each of 
which is a function decorated with `@action(EventType)`. Actions consume 
events, perform reasoning or tool calls, and emit new events, which may trigger 
downstream actions. This event-driven workflow forms a directed cyclic graph of 
computation, where each node is an action and each edge is an event type.
 
 A workflow agent is well-suited for scenarios where the solution requires 
explicit orchestration, branching, or multi-step reasoning, such as data 
enrichment, multi-tool pipelines, or complex business logic.
 
@@ -46,46 +46,44 @@ class ReviewAnalysisAgent(Agent):
     @staticmethod
     def review_analysis_prompt() -> Prompt:
         """Prompt for review analysis."""
-        prompt_str = """
-    Analyze the user review and product information to determine a
-    satisfaction score (1-5) and potential reasons for dissatisfaction.
-
-    Example input format:
-    {{
-        "id": "12345",
-        "review": "The headphones broke after one week of use. Very poor 
quality."
-    }}
-
-    Ensure your response can be parsed by Python JSON, using this format as an 
example:
-    {{
-     "score": 1,
-     "reasons": [
-       "poor quality"
-       ]
-    }}
-
-    input:
-    {input}
-    """
-        return Prompt.from_text("review_analysis_prompt", prompt_str)
+        return review_analysis_prompt
+
+    @tool
+    @staticmethod
+    def notify_shipping_manager(id: str, review: str) -> None:
+        """Notify the shipping manager when product received a negative review 
due to
+        shipping damage.
+
+        Parameters
+        ----------
+        id : str
+            The id of the product that received a negative review due to 
shipping damage
+        review: str
+            The negative review content
+        """
+        # reuse the declared function, but for parsing the tool metadata, we 
write doc
+        # string here again.
+        notify_shipping_manager(id=id, review=review)
 
     @chat_model_setup
     @staticmethod
-    def review_analysis_model() -> Tuple[Type[BaseChatModelSetup], Dict[str, 
Any]]:
+    def review_analysis_model() -> ResourceDescriptor:
         """ChatModel which focus on review analysis."""
-        return OllamaChatModelSetup, {
-            "name": "review_analysis_model",
-            "connection": "ollama_server",
-            "prompt": "review_analysis_prompt",
-            "extract_reasoning": True,
-        }
+        return ResourceDescriptor(
+            clazz=OllamaChatModelSetup,
+            connection="ollama_server",
+            model="qwen3:8b",
+            prompt="review_analysis_prompt",
+            tools=["notify_shipping_manager"],
+            extract_reasoning=True,
+        )
 
     @action(InputEvent)
     @staticmethod
     def process_input(event: InputEvent, ctx: RunnerContext) -> None:
         """Process input event and send chat request for review analysis."""
         input: ProductReview = event.input
-        ctx.get_short_term_memory().set("id", input.id)
+        ctx.short_term_memory.set("id", input.id)
 
         content = f"""
             "id": {input.id},
@@ -103,7 +101,7 @@ class ReviewAnalysisAgent(Agent):
             ctx.send_event(
                 OutputEvent(
                     output=ProductReviewAnalysisRes(
-                        id=ctx.get_short_term_memory().get("id"),
+                        id=ctx.short_term_memory.get("id"),
                         score=json_content["score"],
                         reasons=json_content["reasons"],
                     )
@@ -119,20 +117,140 @@ class ReviewAnalysisAgent(Agent):
 
 ## Action
 
-{{< hint warning >}}
-**TODO**: How to define an action.
-{{< /hint >}}
+An action is a piece of code that can be executed. Each action listens to at 
least one type of event. When an event of the listening type occurs, the action 
will be triggered. An action can also generate new events, to trigger other 
actions.
+
+
+To declare an action in Agent, user can use `@action` to decorate a function 
of Agent class, and declare the listened event types as decorator parameters. 
+
+The decorated function signature should be `(Event, RunnerContext) -> None`
+```python
+class ReviewAnalysisAgent(Agent):
+    @action(InputEvent)
+    @staticmethod
+    def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+        # the action logic
+```
 
+In the function, user can also send new events, to trigger other actions, or 
output the data.
+```python
+@action(InputEvent)
+@staticmethod
+def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+    # send ChatRequestEvent
+    ctx.send_event(ChatRequestEvent(model=xxx, messages=xxx))
+    # output data to downstream
+    ctx.send_event(OutputEvent(output=xxx))
+```
 ## Event
+Events are messages passed between actions. Events may carry payloads. A 
single event may trigger multiple actions if they are all listening to its type.
+
+There are 2 special types of event.
+* `InputEvent`: Generated by the framework, carrying an input data record that 
arrives at the agent in `input` field . Actions listening to the `InputEvent` 
will be the entry points of agent.
+* `OutputEvent`: The framework will listen to `OutputEvent`, and convert its 
payload in `output` field into outputs of the agent. By generating 
`OutputEvent`, actions can emit output data.
+
+User can define owner event by extends `Event`.
+```python
+class MyEvent(Event):
+    value: Any
+```
+Then, user can define actions listen to or send `MyEvent`.
 
-{{< hint warning >}}
-**TODO**: How to define, send and handle an event, including InputEvent, 
OutputEvent, and event between actions.
+> **Note**: The payload of `Event` should be `BaseModel` serializable. 
 
-{{< /hint >}}
+## Built-in Events and Actions
+
+There are several built-in `Event` and `Action` in Flink-Agents:
+* See [chat with llm]({{< ref "docs/development/chat_with_llm" >}}) for how to 
chat with a LLM leveraging built-in action and events.
+* See [tool use]({{< ref "docs/development/tool_use" >}}) for how to 
programmatically use a tool leveraging built-in action and events.
 
 ## Memory
 
-{{< hint warning >}}
-**TODO**: How to use short-term memory and long-term memory.
-{{< /hint >}}
+Memory is data that will be remembered across actions and agent runs.
+
+### Short-Term Memory
+
+Short-Term Memory is shared across all actions within an agent run, and 
multiple agent runs with the same input key. 
+
+Here an *agent run* refers to a complete execution of an agent. Each record 
from upstream will trigger a new run of agent.
+
+This corresponds to Flink's Keyed State, which is visible to processing of 
multiple records with in the same keyed partition, and is transparent to 
processing of data in other keyed partitions.
+
+#### Basic Usage
+
+User can set and get short-term memory in actions.
+
+```python
+@action(InputEvent)
+@staticmethod
+def first_action(event: InputEvent, ctx: RunnerContext) -> None:
+    ...
+    ctx.short_term_memory.set("id", input.id)
+    ...
+
+@action(ChatResponseEvent)
+@staticmethod
+def second_action(event: ChatResponseEvent, ctx: RunnerContext) -> None:
+    ...
+    id=ctx.short_term_memory.get("id"),
+    ...
+```
+
+#### Store Nested Object
+
+It's not just a k-v map. User can store nested objects.
+
+```python
+@action(InputEvent)
+@staticmethod
+def first_action(event: InputEvent, ctx: RunnerContext) -> None:
+    ...
+    stm = ctx.short_term_memory
+    
+    # create nested memory object, and thenset the leaf value
+    nested_obj = ctx.new_object("a")
+    nested_obj.set("b", input.id)
+    
+    # directly set leaf value, will auto crate the nested object    
+    ctx.set("x.y", input.user)
+    ...
+    
+@action(ChatResponseEvent)
+@staticmethod
+def second_action(event: InputEvent, ctx: RunnerContext) -> None:
+    ...
+    stm = ctx.short_term_memory
+    
+    # directly get leaf value, will auto parse the nested object
+    id = stm.get("a.b")
+    
+    # get the nested object, and then get the leaf value
+    nested_obj = stm.get("x")
+    user = nested_obj.get("y")
+    ...
+```
+#### Memory Reference
+
+The `set` method of short term memory will return a `MemoryRef`, which can be 
treated as a reference to the stored object. 
+
+If user want to pass the stored object between actions, they can pass the 
reference instead, which can reduce the payload size of Event.
+
+```python
+@staticmethod
+def first_action(event: Event, ctx: RunnerContext):  # noqa D102
+    ...
+    stm = ctx.get_short_term_memory()
+    
+    data_ref = stm.set(data_path, data_to_store)
+    ctx.send_event(MyEvent(value=data_ref))
+    ...
 
+@action(MyEvent)
+@staticmethod
+def second_action(event: Event, ctx: RunnerContext):  # noqa D102
+    ...
+    stm = ctx.get_short_term_memory()
+    
+    content_ref: MemoryRef = event.value
+    processed_data: ProcessedData = stm.get(content_ref)
+    ...
+```
\ No newline at end of file

Reply via email to