This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 7d47787 [doc] Add instructions for java in workflow agent and react
agent doc. (#258)
7d47787 is described below
commit 7d47787f6b200de01ff04e7add41deded152d784
Author: Wenjin Xie <[email protected]>
AuthorDate: Thu Oct 9 17:21:59 2025 +0800
[doc] Add instructions for java in workflow agent and react agent doc.
(#258)
---
docs/content/docs/development/react_agent.md | 138 ++++++++++++
docs/content/docs/development/workflow_agent.md | 265 +++++++++++++++++++++++-
2 files changed, 394 insertions(+), 9 deletions(-)
diff --git a/docs/content/docs/development/react_agent.md
b/docs/content/docs/development/react_agent.md
index 64d965b..a22054f 100644
--- a/docs/content/docs/development/react_agent.md
+++ b/docs/content/docs/development/react_agent.md
@@ -28,6 +28,9 @@ ReAct Agent is a general paradigm that combines reasoning and
action capabilitie
## ReAct Agent Example
+{{< tabs "ReAct Agent Example" >}}
+
+{{< tab "Python" >}}
```python
my_react_agent = ReActAgent(
chat_model=chat_model_descriptor,
@@ -35,11 +38,31 @@ my_react_agent = ReActAgent(
output_schema=MyBaseModelDataType, # or output_schema=my_row_type_info
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+ReActAgent myReActAgent =
+ new ReActAgent(
+ chatModelDescriptor,
+ myPrompt,
+ MyBaseModelDataType.class
+ // or myRowTypeInfo
+ );
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Initialize Arguments
### Chat Model
User should specify the chat model used in the ReAct Agent.
We use `ResourceDescriptor` to describe the chat model, includes chat model
type and chat model arguments. See [Chat Model]({{< ref
"docs/development/chat_models" >}}) for more details.
+
+{{< tabs "ChatModel ResourceDescriptor" >}}
+
+{{< tab "Python" >}}
```python
chat_model_descriptor = ResourceDescriptor(
clazz=OllamaChatModelSetup,
@@ -48,11 +71,30 @@ chat_model_descriptor = ResourceDescriptor(
tools=["my_tool1, my_tool2"],
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+ResourceDescriptor chatModelDescriptor =
+
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+ .addInitialArgument("connection", "myOllamaConnection")
+ .addInitialArgument("model", "qwen3:8b")
+ .addInitialArgument(
+ "tools", List.of("myTool1", "myTool2"))
+ .build();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
### Prompt
User can provide prompt to instruct agent.
The prompt should contain two messages. The first message tells the agent what
to do, and gives the input and output example. The second message tells how to
convert input element to text string.
+
+{{< tabs "Prompt" >}}
+
+{{< tab "Python" >}}
```python
system_prompt_str = """
Analyze
@@ -84,15 +126,50 @@ my_prompt = Prompt.from_messages(
),
],
)
+```
+{{< /tab >}}
+{{< tab "Java" >}}
+```java
+String systemPromptString =
+ "Analyze ..."
+ + "Example input format:\n"
+ + "..."
+ + "Ensure your response can be parsed by Java JSON, using this
format as an example:\n"
+ + "...";
+
+// Prompt for review analysis react agent.
+Prompt myPrompt = new Prompt(
+ Arrays.asList(
+ new ChatMessage(MessageRole.SYSTEM, systemPromptString),
+ new ChatMessage(
+ MessageRole.USER,
+ "{\"id\": \"{id}\",\n" + "\"review\":
\"{review}\"}")));
```
+{{< /tab >}}
+
+{{< /tabs >}}
+
If the input element is primitive types, like `int`, `str` and so on, the
second message should be
+
+{{< tabs "Prepare Agents Execution Environment" >}}
+
+{{< tab "Python" >}}
```python
ChatMessage(
role=MessageRole.USER,
content="{input}"
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+new ChatMessage(MessageRole.USER, "{input}")
+```
+{{< /tab >}}
+
+{{< /tabs >}}
See [Prompt]({{< ref "docs/development/prompts" >}}) for more details.
@@ -100,6 +177,10 @@ See [Prompt]({{< ref "docs/development/prompts" >}}) for
more details.
User can set output schema to configure the ReAct Agent output type. If output
schema is set, the ReAct Agent will deserialize the llm response to expected
type.
The output schema should be `BaseModel` or `RowTypeInfo`.
+
+{{< tabs "Output Schema" >}}
+
+{{< tab "Python" >}}
```python
class MyBaseModelDataType(BaseModel):
id: str
@@ -112,4 +193,61 @@ my_row_type_info = RowTypeInfo(
["id", "score"],
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@JsonSerialize
+@JsonDeserialize
+public static class MyBaseModelDataType {
+ private final String id;
+ private final int score;
+ private final List<String> reasons;
+
+ @JsonCreator
+ public ProductReviewAnalysisRes(
+ @JsonProperty("id") String id,
+ @JsonProperty("score") int score,
+ @JsonProperty("reasons") List<String> reasons) {
+ this.id = id;
+ this.score = score;
+ this.reasons = reasons;
+ }
+
+ public ProductReviewAnalysisRes() {
+ id = null;
+ score = 0;
+ reasons = List.of();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public int getScore() {
+ return score;
+ }
+
+ public List<String> getReasons() {
+ return reasons;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ProductReviewAnalysisRes{id='%s', score=%d, reasons=%s}", id,
score, reasons);
+ }
+}
+
+// Currently, for RowTypeInfo, only support BasicType fields.
+RowTypeInfo myRowTypeInfo =
+ new RowTypeInfo(
+ new TypeInformation[] {
+ BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
+ },
+ new String[] {"id", "score"});
+```
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/docs/development/workflow_agent.md
b/docs/content/docs/development/workflow_agent.md
index b77e177..9ada099 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -26,12 +26,15 @@ under the License.
A workflow style agent in Flink-Agents is an agent whose reasoning and
behavior are organized as a directed workflow of modular steps, called actions,
connected by events. This design is inspired by the need to orchestrate
complex, multi-stage tasks in a transparent, extensible, and data-centric way,
leveraging Apache Flink's streaming architecture.
-In Flink-Agents, a workflow agent is defined as a class that inherits from the
`Agent` base class. The agent's logic is expressed as a set of actions, each of
which is a function decorated with `@action(EventType)`. Actions consume
events, perform reasoning or tool calls, and emit new events, which may trigger
downstream actions. This event-driven workflow forms a directed cyclic graph of
computation, where each node is an action and each edge is an event type.
+In Flink-Agents, a workflow agent is defined as a class that inherits from the
`Agent` base class. The agent's logic is expressed as a set of actions, each of
which is a function decorated with `@action(EventType)` in python (or a method
annotated with `@action(listenEvents = {})` in java). Actions consume events,
perform reasoning or tool calls, and emit new events, which may trigger
downstream actions. This event-driven workflow forms a directed cyclic graph of
computation, where each [...]
A workflow agent is well-suited for scenarios where the solution requires
explicit orchestration, branching, or multi-step reasoning, such as data
enrichment, multi-tool pipelines, or complex business logic.
## Workflow Agent Example
+{{< tabs "Workflow Agent Example" >}}
+
+{{< tab "Python" >}}
```python
class ReviewAnalysisAgent(Agent):
"""An agent that uses a large language model (LLM) to analyze product
reviews
@@ -114,15 +117,113 @@ class ReviewAnalysisAgent(Agent):
# To fail the agent, you can raise an exception here.
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+/**
+ * An agent that uses a large language model (LLM) to analyze product reviews
and generate a
+ * satisfaction score and potential reasons for dissatisfaction.
+ *
+ * <p>This agent receives a product review and produces a satisfaction score
and a list of reasons
+ * for dissatisfaction. It handles prompt construction, LLM interaction, and
output parsing.
+ */
+public class ReviewAnalysisAgent extends Agent {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Prompt
+ public static org.apache.flink.agents.api.prompt.Prompt
reviewAnalysisPrompt() {
+ return REVIEW_ANALYSIS_PROMPT;
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor reviewAnalysisModel() {
+ return
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+ .addInitialArgument("connection", "ollamaChatModelConnection")
+ .addInitialArgument("model", "qwen3:8b")
+ .addInitialArgument("prompt", "reviewAnalysisPrompt")
+ .addInitialArgument("tools",
Collections.singletonList("notifyShippingManager"))
+ .addInitialArgument("extract_reasoning", "true")
+ .build();
+ }
+
+ /**
+ * Tool for notifying the shipping manager when product received a
negative review due to
+ * shipping damage.
+ *
+ * @param id The id of the product that received a negative review due to
shipping damage
+ * @param review The negative review content
+ */
+ @Tool(
+ description =
+ "Notify the shipping manager when product received a
negative review due to shipping damage.")
+ public static void notifyShippingManager(
+ @ToolParam(name = "id") String id, @ToolParam(name = "review")
String review) {
+ CustomTypesAndResources.notifyShippingManager(id, review);
+ }
+
+ /** Process input event and send chat request for review analysis. */
+ @Action(listenEvents = {InputEvent.class})
+ public static void processInput(InputEvent event, RunnerContext ctx)
throws Exception {
+ String input = (String) event.getInput();
+ MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ CustomTypesAndResources.ProductReview inputObj =
+ MAPPER.readValue(input,
CustomTypesAndResources.ProductReview.class);
+
+ ctx.getShortTermMemory().set("id", inputObj.getId());
+
+ String content =
+ String.format(
+ "{\n" + "\"id\": %s,\n" + "\"review\": \"%s\"\n" + "}",
+ inputObj.getId(), inputObj.getReview());
+ ChatMessage msg = new ChatMessage(MessageRole.USER, "",
Map.of("input", content));
+
+ ctx.sendEvent(new ChatRequestEvent("reviewAnalysisModel",
List.of(msg)));
+ }
+
+ @Action(listenEvents = ChatResponseEvent.class)
+ public static void processChatResponse(ChatResponseEvent event,
RunnerContext ctx)
+ throws Exception {
+ JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+ JsonNode scoreNode = jsonNode.findValue("score");
+ JsonNode reasonsNode = jsonNode.findValue("reasons");
+ if (scoreNode == null || reasonsNode == null) {
+ throw new IllegalStateException(
+ "Invalid response from LLM: missing 'score' or 'reasons'
field.");
+ }
+ List<String> result = new ArrayList<>();
+ if (reasonsNode.isArray()) {
+ for (JsonNode node : reasonsNode) {
+ result.add(node.asText());
+ }
+ }
+
+ ctx.sendEvent(
+ new OutputEvent(
+ new CustomTypesAndResources.ProductReviewAnalysisRes(
+
ctx.getShortTermMemory().get("id").getValue().toString(),
+ scoreNode.asInt(),
+ result)));
+ }
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
## Action
An action is a piece of code that can be executed. Each action listens to at
least one type of event. When an event of the listening type occurs, the action
will be triggered. An action can also generate new events, to trigger other
actions.
-To declare an action in Agent, user can use `@action` to decorate a function
of Agent class, and declare the listened event types as decorator parameters.
+To declare an action in Agent, user can use `@action` to decorate a function
of Agent class in python (or annotate a method of Agent class in java), and
declare the listened event types as decorator/annotation parameters.
+
+The decorated/annotated function signature should be `(Event, RunnerContext)
-> None`
-The decorated function signature should be `(Event, RunnerContext) -> None`
+{{< tabs "Action Function" >}}
+
+{{< tab "Python" >}}
```python
class ReviewAnalysisAgent(Agent):
@action(InputEvent)
@@ -130,8 +231,27 @@ class ReviewAnalysisAgent(Agent):
def process_input(event: InputEvent, ctx: RunnerContext) -> None:
# the action logic
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+public class ReviewAnalysisAgent extends Agent {
+ /** Process input event and send chat request for review analysis. */
+ @Action(listenEvents = {InputEvent.class})
+ public static void processInput(InputEvent event, RunnerContext ctx)
throws Exception {
+ // the action logic
+ }
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
In the function, user can also send new events, to trigger other actions, or
output the data.
+
+{{< tabs "Send Event" >}}
+
+{{< tab "Python" >}}
```python
@action(InputEvent)
@staticmethod
@@ -141,6 +261,22 @@ def process_input(event: InputEvent, ctx: RunnerContext)
-> None:
# output data to downstream
ctx.send_event(OutputEvent(output=xxx))
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void processInput(InputEvent event, RunnerContext ctx) throws
Exception {
+ // send ChatRequestEvent
+ ctx.sendEvent(new ChatRequestEvent("my_model", messages));
+ // output data to downstream
+ ctx.sendEvent(new OutputEvent(xxx));
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Event
Events are messages passed between actions. Events may carry payloads. A
single event may trigger multiple actions if they are all listening to its type.
@@ -149,13 +285,31 @@ There are 2 special types of event.
* `OutputEvent`: The framework will listen to `OutputEvent`, and convert its
payload in `output` field into outputs of the agent. By generating
`OutputEvent`, actions can emit output data.
User can define owner event by extends `Event`.
+
+{{< tabs "Custom Event" >}}
+
+{{< tab "Python" >}}
```python
class MyEvent(Event):
value: Any
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+public class MyEvent extends Event {
+ private Object value;
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
Then, user can define actions listen to or send `MyEvent`.
-> **Note**: The payload of `Event` should be `BaseModel` serializable.
+{{< hint info >}}
+The payload of python `Event` should be `BaseModel` serializable, of java
`Event` should be json serializable.
+{{< /hint >}}
## Built-in Events and Actions
@@ -179,6 +333,9 @@ This corresponds to Flink's Keyed State, which is visible
to processing of multi
User can set and get short-term memory in actions.
+{{< tabs "Basic Usage" >}}
+
+{{< tab "Python" >}}
```python
@action(InputEvent)
@staticmethod
@@ -191,14 +348,38 @@ def first_action(event: InputEvent, ctx: RunnerContext)
-> None:
@staticmethod
def second_action(event: ChatResponseEvent, ctx: RunnerContext) -> None:
...
- id=ctx.short_term_memory.get("id"),
+ id = ctx.short_term_memory.get("id"),
...
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void firstAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ ctx.getShortTermMemory().set("id", inputObj.getId());
+ ...
+}
+
+@Action(listenEvents = {ChatResponseEvent.class})
+public static void firstAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ String id = ctx.getShortTermMemory().get("id").getValue();
+ ...
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
#### Store Nested Object
It's not just a k-v map. User can store nested objects.
+{{< tabs "Store Nested Object" >}}
+
+{{< tab "Python" >}}
```python
@action(InputEvent)
@staticmethod
@@ -206,12 +387,12 @@ def first_action(event: InputEvent, ctx: RunnerContext)
-> None:
...
stm = ctx.short_term_memory
- # create nested memory object, and thenset the leaf value
- nested_obj = ctx.new_object("a")
+ # create nested memory object, and then set the leaf value
+ nested_obj = stm.new_object("a")
nested_obj.set("b", input.id)
# directly set leaf value, will auto crate the nested object
- ctx.set("x.y", input.user)
+ stm.set("x.y", input.user)
...
@action(ChatResponseEvent)
@@ -228,12 +409,51 @@ def second_action(event: InputEvent, ctx: RunnerContext)
-> None:
user = nested_obj.get("y")
...
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void firstAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ MemoryObject stm = ctx.getShortTermMemory();
+
+ // create nested memory object, and then set the leaf value
+ MemoryObject nestedObj = stm.newObject("a", true);
+ nestedObj.set("b", input.getId());
+
+ // directly set leaf value, will auto crate the nested object
+ stm.set("x.y", input.getUser());
+ ...
+}
+
+@Action(listenEvents = {ChatResponseEvent.class})
+public static void secondAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ MemoryObject stm = ctx.getShortTermMemory();
+
+ // directly get leaf value, will auto parse the nested object
+ String id = stm.get("a.b").getValue();
+
+ // get the nested object, and then get the leaf value
+ MemoryObject nestedObj = stm.get("x");
+ String user = nestedObj.get("y").getValue();
+ ...
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
#### Memory Reference
The `set` method of short term memory will return a `MemoryRef`, which can be
treated as a reference to the stored object.
If user want to pass the stored object between actions, they can pass the
reference instead, which can reduce the payload size of Event.
+{{< tabs "Memory Reference" >}}
+
+{{< tab "Python" >}}
```python
@staticmethod
def first_action(event: Event, ctx: RunnerContext): # noqa D102
@@ -253,4 +473,31 @@ def second_action(event: Event, ctx: RunnerContext): #
noqa D102
content_ref: MemoryRef = event.value
processed_data: ProcessedData = stm.get(content_ref)
...
-```
\ No newline at end of file
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void firstAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ MemoryObject stm = ctx.getShortTermMemory();
+
+ MemoryRef dataRef = stm.set(dataPath, dataToStore);
+ ctx.send_event(new MyEvent(dataRef));
+ ...
+}
+
+@Action(listenEvents = {MyEvent.class})
+public static void secondAction(Event event, RunnerContext ctx) throws
Exception {
+ ...
+ MemoryObject stm = ctx.getShortTermMemory();
+
+ MemoryRef contentRef = event.getValue();
+ ProcessedData processedData = stm.get(contentRef).getValue();
+ ...
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}