This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit afbde33c0bd7217ac807b1eb5c971fcce55d19a7
Author: WenjinXie <[email protected]>
AuthorDate: Thu Oct 9 12:46:39 2025 +0800

    [doc] Add instructions for java in quickstart doc.
---
 .../docs/get-started/quickstart/react_agent.md     | 157 ++++++++++++++-
 .../docs/get-started/quickstart/workflow_agent.md  | 220 +++++++++++++++++++--
 .../org/apache/flink/agents/plan/JavaFunction.java |   1 +
 3 files changed, 358 insertions(+), 20 deletions(-)

diff --git a/docs/content/docs/get-started/quickstart/react_agent.md 
b/docs/content/docs/get-started/quickstart/react_agent.md
index 74d7efe..613049b 100644
--- a/docs/content/docs/get-started/quickstart/react_agent.md
+++ b/docs/content/docs/get-started/quickstart/react_agent.md
@@ -35,6 +35,10 @@ The **Review Analysis** agent processes a stream of product 
reviews and uses a s
 ### Prepare Agents Execution Environment
 
 Create the agents execution environment, and register the available chat model 
connections and tools to the environment. 
+
+{{< tabs "Prepare Agents Execution Environment" >}}
+
+{{< tab "Python" >}}
 ```python
 # Set up the Flink streaming environment and the Agents execution environment.
 env = StreamExecutionEnvironment.get_execution_environment()
@@ -49,9 +53,39 @@ agents_env.add_resource(
     "notify_shipping_manager", Tool.from_callable(notify_shipping_manager)
 )
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Set up the Flink streaming environment and the Agents execution environment.
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+AgentsExecutionEnvironment agentsEnv =
+        AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+// Add Ollama chat model connection and record shipping question tool to be 
used
+// by the Agent.
+agentsEnv
+        .addResource(
+                "ollamaChatModelConnection",
+                ResourceType.CHAT_MODEL_CONNECTION,
+                CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR)
+        .addResource(
+                "notifyShippingManager",
+                ResourceType.TOOL,
+                org.apache.flink.agents.api.tools.Tool.fromMethod(
+                        ReActAgentExample.class.getMethod(
+                                "notifyShippingManager", String.class, 
String.class)));
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ### Create the ReAct Agent
 Create the ReAct Agent instance, configure the chat model, prompt and the 
output schema of result to be used.
+
+{{< tabs "Create the ReAct Agent" >}}
+
+{{< tab "Python" >}}
 ```python
 review_analysis_react_agent = ReActAgent(
     chat_model=ResourceDescriptor(
@@ -64,9 +98,35 @@ review_analysis_react_agent = ReActAgent(
     output_schema=ProductReviewAnalysisRes,
 )
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Create ReAct agent.
+ReActAgent reviewAnalysisReactAgent = getReActAgent();
+
+private static ReActAgent getReActAgent() {
+    return new ReActAgent(
+            
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+                    .addInitialArgument("connection", 
"ollamaChatModelConnection")
+                    .addInitialArgument("model", "qwen3:8b")
+                    .addInitialArgument(
+                            "tools", 
Collections.singletonList("notifyShippingManager"))
+                    .build(),
+            reviewAnalysisReactPrompt(),
+            CustomTypesAndResources.ProductReviewAnalysisRes.class);
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ### Integrate the ReAct Agent with Flink
 Produce a source DataStream by reading a product review text file, and use the 
ReAct Agent to analyze the review and generate result DataStream. Finally print 
the result DataStream.
+
+{{< tabs "Integrate the ReAct Agent with Flink" >}}
+
+{{< tab "Python" >}}
 ```python
 # Read product reviews from a text file as a streaming source.
 # Each line in the file should be a JSON string representing a ProductReview.
@@ -98,6 +158,54 @@ review_analysis_res_stream = (
 # Print the analysis results to stdout.
 review_analysis_res_stream.print()
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Read product reviews from input_data.txt file as a streaming source.
+// Each element represents a ProductReview.
+DataStream<Row> productReviewStream =
+        env.fromSource(
+                        FileSource.forRecordStreamFormat(
+                                        new TextLineFormat(),
+                                        new Path(
+                                                Objects.requireNonNull(
+                                                                
ReActAgentExample.class
+                                                                        
.getClassLoader()
+                                                                        
.getResource(
+                                                                               
 "input_data.txt"))
+                                                        .getPath()))
+                                .monitorContinuously(Duration.ofMinutes(1))
+                                .build(),
+                        WatermarkStrategy.noWatermarks(),
+                        "react-agent-example")
+                .map(
+                        inputStr -> {
+                            Row row = Row.withNames();
+                            CustomTypesAndResources.ProductReview 
productReview =
+                                    MAPPER.readValue(
+                                            inputStr,
+                                            
CustomTypesAndResources.ProductReview.class);
+                            row.setField("id", productReview.getId());
+                            row.setField("review", productReview.getReview());
+                            return row;
+                        });
+
+
+// Use the ReAct agent to analyze each product review and
+// record shipping question.
+DataStream<Object> reviewAnalysisResStream =
+        agentsEnv
+                .fromDataStream(productReviewStream)
+                .apply(reviewAnalysisReactAgent)
+                .toDataStream();
+
+// Print the analysis results to stdout.
+reviewAnalysisResStream.print();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ## Run the Example
 
@@ -130,18 +238,45 @@ We recommend creating a Python virtual environment to 
install the Flink Agents P
 
 Follow the [installation]({{< ref "docs/get-started/installation" >}}) 
instructions to install the Flink Agents Python and Java libraries.
 
+#### Clone the Flink Agents repo
+
+Clone the Flink Agents repo to get quickstart example code.
+```bash
+git clone https://github.com/apache/flink-agents.git
+```
+
 #### Deploy a Standalone Flink Cluster
 
 You can deploy a standalone Flink cluster in your local environment with the 
following command.
 
+{{< tabs "Deploy a Standalone Flink Cluster" >}}
+
+{{< tab "Python" >}}
 ```bash
 export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["purelib"])')
 ./flink-1.20.3/bin/start-cluster.sh
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+1. Build Flink Agents from source to generate example jar. See 
[installation]({{< ref "docs/get-started/installation" >}}) for more details. 
+2. Copy the Flink Agents example jar to Flink lib directory
+    ```bash
+    cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar 
./flink-1.20.3/lib/
+    ```
+3. Start the Flink cluster
+    ```bash
+    ./flink-1.20.3/bin/start-cluster.sh
+    ```
+{{< /tab >}}
+
+{{< /tabs >}}
 You can refer to the [local 
cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster)
 instructions for more detailed step.
 
 
-> **NOTE:** If you can't navigate to the web UI at 
[localhost:8081](localhost:8081), you can find the reason in 
`./flink-1.20.3/log`. If the reason is port conflict, you can change the port 
in `./flink-1.20.3/conf/config.yaml`.
+{{< hint info >}}
+If you can't navigate to the web UI at [localhost:8081](localhost:8081), you 
can find the reason in `./flink-1.20.3/log`. If the reason is port conflict, 
you can change the port in `./flink-1.20.3/conf/config.yaml`.
+{{< /hint >}}
 
 #### Prepare Ollama
 
@@ -155,21 +290,29 @@ ollama run qwen3:8b
 
 ### Submit Flink Agents Job to Standalone Flink Cluster
 
-#### Clone the Flink Agents repo
+#### Submit to Flink Cluster
 
-```bash
-git clone https://github.com/apache/flink-agents.git
-```
+{{< tabs "Submit to Flink Cluster" >}}
 
-#### Submit to Flink Cluster
+{{< tab "Python" >}}
 ```bash
 export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["purelib"])')
 
 # Run review analysis example
 ./flink-1.20.3/bin/flink run -py 
./flink-agents/python/flink_agents/examples/quickstart/react_agent_example.py
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```bash
+./flink-1.20.3/bin/flink run -c 
org.apache.flink.agents.examples.ReActAgentExample 
./flink-agents/examples/target/flink-agents-examples-$VERSION.jar
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
-Now you should see a Flink job submitted to the Flink Cluster in Flink web UI 
[localhost:8081](localhost:8081)
+Now you should see a Flink job submitted to the Flink Cluster in Flink web UI 
[localhost:8081](
+localhost:8081)
 
 After a few minutes, you can check for the output in the TaskManager output 
log.
 
diff --git a/docs/content/docs/get-started/quickstart/workflow_agent.md 
b/docs/content/docs/get-started/quickstart/workflow_agent.md
index 76b3c63..e58dbbe 100644
--- a/docs/content/docs/get-started/quickstart/workflow_agent.md
+++ b/docs/content/docs/get-started/quickstart/workflow_agent.md
@@ -40,6 +40,9 @@ Together, these examples show how to build a multi-agent 
workflow with Flink Age
 
 Create the agents execution environment, and register the available chat model 
connections, which can be used by the agents, to the environment.
 
+{{< tabs "Prepare Agents Execution Environment" >}}
+
+{{< tab "Python" >}}
 ```python
 # Set up the Flink streaming environment and the Agents execution environment.
 env = StreamExecutionEnvironment.get_execution_environment()
@@ -52,11 +55,32 @@ agents_env.add_resource(
     ollama_server_descriptor,
 )
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Set up the Flink streaming environment and the Agents execution environment.
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+AgentsExecutionEnvironment agentsEnv =
+        AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+// Add Ollama chat model connection to be used by the ReviewAnalysisAgent.
+agentsEnv.addResource(
+        "ollamaChatModelConnection",
+        ResourceType.CHAT_MODEL_CONNECTION,
+        CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
 ### Create the Agents
 
 Below is the example code for the `ReviewAnalysisAgent`, which is used to 
analyze the product reviews and generate a satisfaction score and potential 
reasons for dissatisfaction. It demonstrates how to define the prompt, tool, 
chat model, and action for the agent. Also, it shows how to process the chat 
response and send the output event. For more details, please refer to the 
[Workflow Agent]({{< ref "docs/development/workflow_agent" >}}) documentation.
 
+{{< tabs "Create the Agents" >}}
+
+{{< tab "Python" >}}
 ```python
 class ReviewAnalysisAgent(Agent):
     """An agent that uses a large language model (LLM) to analyze product 
reviews
@@ -139,6 +163,100 @@ class ReviewAnalysisAgent(Agent):
 
             # To fail the agent, you can raise an exception here.
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+/**
+ * An agent that uses a large language model (LLM) to analyze product reviews 
and generate a
+ * satisfaction score and potential reasons for dissatisfaction.
+ *
+ * <p>This agent receives a product review and produces a satisfaction score 
and a list of reasons
+ * for dissatisfaction. It handles prompt construction, LLM interaction, and 
output parsing.
+ */
+public class ReviewAnalysisAgent extends Agent {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Prompt
+    public static org.apache.flink.agents.api.prompt.Prompt 
reviewAnalysisPrompt() {
+        return REVIEW_ANALYSIS_PROMPT;
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor reviewAnalysisModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+                .addInitialArgument("connection", "ollamaChatModelConnection")
+                .addInitialArgument("model", "qwen3:8b")
+                .addInitialArgument("prompt", "reviewAnalysisPrompt")
+                .addInitialArgument("tools", 
Collections.singletonList("notifyShippingManager"))
+                .addInitialArgument("extract_reasoning", "true")
+                .build();
+    }
+
+    /**
+     * Tool for notifying the shipping manager when product received a 
negative review due to
+     * shipping damage.
+     *
+     * @param id The id of the product that received a negative review due to 
shipping damage
+     * @param review The negative review content
+     */
+    @Tool(
+            description =
+                    "Notify the shipping manager when product received a 
negative review due to shipping damage.")
+    public static void notifyShippingManager(
+            @ToolParam(name = "id") String id, @ToolParam(name = "review") 
String review) {
+        CustomTypesAndResources.notifyShippingManager(id, review);
+    }
+
+    /** Process input event and send chat request for review analysis. */
+    @Action(listenEvents = {InputEvent.class})
+    public static void processInput(InputEvent event, RunnerContext ctx) 
throws Exception {
+        String input = (String) event.getInput();
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        CustomTypesAndResources.ProductReview inputObj =
+                MAPPER.readValue(input, 
CustomTypesAndResources.ProductReview.class);
+
+        ctx.getShortTermMemory().set("id", inputObj.getId());
+
+        String content =
+                String.format(
+                        "{\n" + "\"id\": %s,\n" + "\"review\": \"%s\"\n" + "}",
+                        inputObj.getId(), inputObj.getReview());
+        ChatMessage msg = new ChatMessage(MessageRole.USER, "", 
Map.of("input", content));
+
+        ctx.sendEvent(new ChatRequestEvent("reviewAnalysisModel", 
List.of(msg)));
+    }
+
+    @Action(listenEvents = ChatResponseEvent.class)
+    public static void processChatResponse(ChatResponseEvent event, 
RunnerContext ctx)
+            throws Exception {
+        JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+        JsonNode scoreNode = jsonNode.findValue("score");
+        JsonNode reasonsNode = jsonNode.findValue("reasons");
+        if (scoreNode == null || reasonsNode == null) {
+            throw new IllegalStateException(
+                    "Invalid response from LLM: missing 'score' or 'reasons' 
field.");
+        }
+        List<String> result = new ArrayList<>();
+        if (reasonsNode.isArray()) {
+            for (JsonNode node : reasonsNode) {
+                result.add(node.asText());
+            }
+        }
+
+        ctx.sendEvent(
+                new OutputEvent(
+                        new CustomTypesAndResources.ProductReviewAnalysisRes(
+                                
ctx.getShortTermMemory().get("id").getValue().toString(),
+                                scoreNode.asInt(),
+                                result)));
+    }
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
 The code for the `ProductSuggestionAgent`, which is used to generate product 
improvement suggestions based on the aggregated analysis results, is similar to 
the `ReviewAnalysisAgent`.
 
@@ -146,6 +264,9 @@ The code for the `ProductSuggestionAgent`, which is used to 
generate product imp
 
 Create the input DataStream by reading the product reviews from a text file as 
a streaming source, and use the `ReviewAnalysisAgent` to analyze the product 
reviews and generate the result DataStream. Finally print the result DataStream.
 
+{{< tabs "Integrate the Agents with Flink" >}}
+
+{{< tab "Python" >}}
 ```python
 # Read product reviews from a text file as a streaming source.
 # Each line in the file should be a JSON string representing a ProductReview.
@@ -178,8 +299,40 @@ review_analysis_res_stream.print()
 # Execute the Flink pipeline.
 agents_env.execute()
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Read product reviews from input_data.txt file as a streaming source.
+// Each element represents a ProductReview.
+DataStream<String> productReviewStream =
+       env.fromSource(
+               FileSource.forRecordStreamFormat(
+                               new TextLineInputFormat(),
+                               new Path(inputDataFile.getAbsolutePath()))
+                       .build(),
+               WatermarkStrategy.noWatermarks(),
+               "streaming-agent-example");
+
+// Use the ReviewAnalysisAgent to analyze each product review.
+DataStream<Object> reviewAnalysisResStream =
+       agentsEnv
+               .fromDataStream(productReviewStream)
+               .apply(new ReviewAnalysisAgent())
+               .toDataStream();
+
+// Print the analysis results to stdout.
+reviewAnalysisResStream.print();
+
+// Execute the Flink pipeline.
+agentsEnv.execute();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Run the Example
 
-## Run the Examples
 ### Prerequisites
 
 * Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL)
@@ -205,40 +358,67 @@ You can refer to the [local 
installation](https://nightlies.apache.org/flink/fli
 
 #### Prepare Flink Agents
 
-We recommand creating a Python virtual environment to install the Flink Agents 
Python library.
+We recommend creating a Python virtual environment to install the Flink Agents 
Python library.
 
 Follow the [installation]({{< ref "docs/get-started/installation" >}}) 
instructions to install the Flink Agents Python and Java libraries.
 
-#### Prepare Ollama
-
-Download and install Ollama from the official 
[website](https://ollama.com/download).
-
-Then pull the qwen3:8b model, which is required by the quickstart examples
+#### Clone the Flink Agents repo
 
+Clone the Flink Agents repo to get quickstart example code.
 ```bash
-ollama pull qwen3:8b
+git clone https://github.com/apache/flink-agents.git
 ```
 
 #### Deploy a Standalone Flink Cluster
 
 You can deploy a standalone Flink cluster in your local environment with the 
following command.
 
+{{< tabs "Deploy a Standalone Flink Cluster" >}}
+
+{{< tab "Python" >}}
 ```bash
 export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["purelib"])')
 ./flink-1.20.3/bin/start-cluster.sh
 ```
+{{< /tab >}}
 
-You should be able to navigate to the web UI at 
[localhost:8081](localhost:8081) to view the Flink dashboard and see that the 
cluster is up and running.
+{{< tab "Java" >}}
+1. Build Flink Agents from source to generate example jar. See 
[installation]({{< ref "docs/get-started/installation" >}}) for more details.
+2. Copy the Flink Agents example jar to Flink lib directory
+    ```bash
+    cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar 
./flink-1.20.3/lib/
+    ```
+3. Start the Flink cluster
+    ```bash
+    ./flink-1.20.3/bin/start-cluster.sh
+    ```
+{{< /tab >}}
 
-### Submit Flink Agents Job to Standalone Flink Cluster
+{{< /tabs >}}
+You can refer to the [local 
cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster)
 instructions for more detailed step.
 
-#### Clone the Flink Agents repo
+
+{{< hint info >}}
+If you can't navigate to the web UI at [localhost:8081](localhost:8081), you 
can find the reason in `./flink-1.20.3/log`. If the reason is port conflict, 
you can change the port in `./flink-1.20.3/conf/config.yaml`.
+{{< /hint >}}
+
+#### Prepare Ollama
+
+Download and install Ollama from the official 
[website](https://ollama.com/download).
+
+Then run the qwen3:8b model, which is required by the quickstart examples
 
 ```bash
-git clone https://github.com/apache/flink-agents.git
+ollama run qwen3:8b
 ```
 
+### Submit Flink Agents Job to Standalone Flink Cluster
+
 #### Submit to Flink Cluster
+
+{{< tabs "Submit to Flink Cluster" >}}
+
+{{< tab "Python" >}}
 ```bash
 export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["purelib"])')
 
@@ -248,7 +428,21 @@ export PYTHONPATH=$(python -c 'import sysconfig; 
print(sysconfig.get_paths()["pu
 # Run product suggestion example
 ./flink-1.20.3/bin/flink run -py 
./flink-agents/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```bash
+# Run review analysis example
+./flink-1.20.3/bin/flink run -c 
org.apache.flink.agents.examples.WorkflowSingleAgentExample 
./flink-agents/examples/target/flink-agents-examples-$VERSION.jar
+
+# Run product suggestion example
+./flink-1.20.3/bin/flink run -c 
org.apache.flink.agents.examples.WorkflowMultipleAgentExample 
./flink-agents/examples/target/flink-agents-examples-$VERSION.jar
+```
+{{< /tab >}}
+
+{{< /tabs >}}
 
-Now you should see a Flink job submitted to the Flink Cluster in Flink web UI 
[localhost:8081](localhost:8081)
+Now you should see a Flink job submitted to the Flink Cluster in Flink web UI 
[localhost:8081](
+localhost:8081)
 
 After a few minutes, you can check for the output in the TaskManager output 
log.
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
index 2033df6..e007646 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java
@@ -49,6 +49,7 @@ public class JavaFunction implements Function {
         this.qualName = qualName;
         this.methodName = methodName;
         this.parameterTypes = parameterTypes;
+        // TODO: support get method loaded by user code classloader.
         this.method = Class.forName(qualName).getMethod(methodName, 
parameterTypes);
     }
 

Reply via email to