This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new efb4e235 [docs] Clarify Workflow Agent event emission and metadata 
contract (#813)
efb4e235 is described below

commit efb4e235aab897d8a9dc54166e0381d0e28aab4c
Author: bosiew.tian <[email protected]>
AuthorDate: Tue Jun 9 16:22:55 2026 +0800

    [docs] Clarify Workflow Agent event emission and metadata contract (#813)
    
    Resolves the two documentation follow-ups in #793 (docs-only, no
    runtime/API change):
    
    1. Split the misleading "Send Event" snippet that sent a ChatRequestEvent
       and an OutputEvent from a single action into separate "trigger another
       action" and "emit downstream output" examples, and add a hint explaining
       that an OutputEvent is emitted downstream immediately (bypassing action
       routing) while other events are routed to their listening actions.
    
    2. Document the base-Event metadata contract for custom event subclasses:
       reconstruction should preserve the base id (both languages; assign it 
last
       in Python because the content-based id regenerates on field changes) and
       sourceTimestamp (Java only, runtime-internal). Update the MyEvent example
       to follow the contract, matching built-in events.
    
    Co-authored-by: bosiew.tian <[email protected]>
---
 docs/content/docs/development/workflow_agent.md | 85 +++++++++++++++++++++----
 1 file changed, 73 insertions(+), 12 deletions(-)

diff --git a/docs/content/docs/development/workflow_agent.md 
b/docs/content/docs/development/workflow_agent.md
index 01073d27..d15b6e92 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -266,35 +266,67 @@ public class ReviewAnalysisAgent extends Agent {
 
 In the function, user can also send new events, to trigger other actions, or 
output the data.
 
-{{< tabs "Send Event" >}}
+**Trigger another action** — send a built-in or custom event that another 
action listens to:
+
+{{< tabs "Trigger Another Action" >}}
 
 {{< tab "Python" >}}
 ```python
 @action(InputEvent.EVENT_TYPE)
 @staticmethod
 def process_input(event: Event, ctx: RunnerContext) -> None:
-    # send ChatRequestEvent
-    ctx.send_event(ChatRequestEvent(model=xxx, messages=xxx))
-    # output data to downstream
-    ctx.send_event(OutputEvent(output=xxx))
+    # send a ChatRequestEvent to trigger the built-in chat-model action
+    ctx.send_event(ChatRequestEvent(model="my_model", messages=messages))
 ```
 {{< /tab >}}
 
 {{< tab "Java" >}}
 ```java
 @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
-public static void processInput(Event event, RunnerContext ctx) throws 
Exception {
-    InputEvent inputEvent = InputEvent.fromEvent(event);
-    // send ChatRequestEvent
+public static void processInput(Event event, RunnerContext ctx) {
+    // send a ChatRequestEvent to trigger the built-in chat-model action
     ctx.sendEvent(new ChatRequestEvent("my_model", messages));
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+**Emit downstream output** — send an `OutputEvent` to produce an output of the 
agent:
+
+{{< tabs "Emit Output" >}}
+
+{{< tab "Python" >}}
+```python
+@action(ChatResponseEvent.EVENT_TYPE)
+@staticmethod
+def emit_output(event: Event, ctx: RunnerContext) -> None:
+    # output data to downstream
+    ctx.send_event(OutputEvent(output=result))
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
+public static void emitOutput(Event event, RunnerContext ctx) {
     // output data to downstream
-    ctx.sendEvent(new OutputEvent(xxx));
-}   
+    ctx.sendEvent(new OutputEvent(result));
+}
 ```
 {{< /tab >}}
 
 {{< /tabs >}}
 
+{{< hint info >}}
+An `OutputEvent` is collected and emitted to the agent's downstream 
**immediately**, bypassing
+action routing, while other events (such as `ChatRequestEvent`) are routed to 
the actions that
+listen for them. Sending a `ChatRequestEvent` and an `OutputEvent` from the 
same action is valid
+API usage, but it produces both an immediate output and, once the chat 
response is handled, a
+later model-based output. For the normal chat request/response workflow, emit 
the `OutputEvent`
+from the action that handles the `ChatResponseEvent`, as shown above.
+{{< /hint >}}
+
 ### Durable Execution
 
 Use durable execution when you wrap a time-consuming or side-effecting 
operation. The framework persists the result and replays it on recovery when 
the same call is encountered, so the function will not be called again and side 
effects are avoided. When recovery re-enters an action that has not been 
recorded as completed, code outside `durable_execute` / `durable_execute_async` 
will still be re-executed.
@@ -630,7 +662,11 @@ class MyEvent(Event):
     @override
     def from_event(cls, event: Event) -> "MyEvent":
         assert "value" in event.attributes
-        return MyEvent(value=event.attributes["value"])
+        result = MyEvent(value=event.attributes["value"])
+        # Preserve the base event id. Assign it last: the content-based id is
+        # regenerated whenever another field changes.
+        result.id = event.id
+        return result
 
     @property
     def value(self) -> str:
@@ -648,8 +684,20 @@ public class MyEvent extends Event {
         setAttr("value", value);
     }
 
+    @JsonCreator
+    public MyEvent(
+            @JsonProperty("id") UUID id,
+            @JsonProperty("attributes") Map<String, Object> attributes) {
+        super(id, EVENT_TYPE, attributes);
+    }
+
     public static MyEvent fromEvent(Event event) {
-        MyEvent result = new MyEvent((String) event.getAttr("value"));
+        // Preserve the base event id (and sourceTimestamp) so event logs, 
listeners,
+        // correlation, deduplication, and timestamp propagation stay 
consistent.
+        MyEvent result = new MyEvent(event.getId(), new 
HashMap<>(event.getAttributes()));
+        if (event.hasSourceTimestamp()) {
+            result.setSourceTimestamp(event.getSourceTimestamp());
+        }
         return result;
     }
 
@@ -662,6 +710,19 @@ public class MyEvent extends Event {
 
 {{< /tabs >}}
 
+{{< hint info >}}
+When reconstructing a typed event, preserve the base `Event` metadata so that 
event logs,
+listeners, correlation, deduplication, and downstream timestamp propagation 
stay consistent with
+built-in events:
+
+- **`id`**: copy the source event's `id` onto the reconstructed event, as all 
built-in events do
+  in both languages. In Python, assign `result.id = event.id` **last**, 
because the content-based
+  `id` is regenerated whenever any other field changes.
+- **`sourceTimestamp`** (Java only): carry it over with 
`setSourceTimestamp(...)` when
+  `hasSourceTimestamp()` is true, matching built-in Java events. This field is 
runtime-internal and
+  used for timestamp propagation; the Python `Event` has no equivalent.
+{{< /hint >}}
+
 {{< hint info >}}
 All attribute values must be JSON-serializable. In Python, this means 
`BaseModel`-serializable or primitive types. In Java, values must be 
Jackson-serializable.
 {{< /hint >}}

Reply via email to