This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit afbde33c0bd7217ac807b1eb5c971fcce55d19a7 Author: WenjinXie <[email protected]> AuthorDate: Thu Oct 9 12:46:39 2025 +0800 [doc] Add instructions for java in quickstart doc. --- .../docs/get-started/quickstart/react_agent.md | 157 ++++++++++++++- .../docs/get-started/quickstart/workflow_agent.md | 220 +++++++++++++++++++-- .../org/apache/flink/agents/plan/JavaFunction.java | 1 + 3 files changed, 358 insertions(+), 20 deletions(-) diff --git a/docs/content/docs/get-started/quickstart/react_agent.md b/docs/content/docs/get-started/quickstart/react_agent.md index 74d7efe..613049b 100644 --- a/docs/content/docs/get-started/quickstart/react_agent.md +++ b/docs/content/docs/get-started/quickstart/react_agent.md @@ -35,6 +35,10 @@ The **Review Analysis** agent processes a stream of product reviews and uses a s ### Prepare Agents Execution Environment Create the agents execution environment, and register the available chat model connections and tools to the environment. + +{{< tabs "Prepare Agents Execution Environment" >}} + +{{< tab "Python" >}} ```python # Set up the Flink streaming environment and the Agents execution environment. env = StreamExecutionEnvironment.get_execution_environment() @@ -49,9 +53,39 @@ agents_env.add_resource( "notify_shipping_manager", Tool.from_callable(notify_shipping_manager) ) ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Add Ollama chat model connection and record shipping question tool to be used +// by the Agent. +agentsEnv + .addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR) + .addResource( + "notifyShippingManager", + ResourceType.TOOL, + org.apache.flink.agents.api.tools.Tool.fromMethod( + ReActAgentExample.class.getMethod( + "notifyShippingManager", String.class, String.class))); +``` +{{< /tab >}} + +{{< /tabs >}} ### Create the ReAct Agent Create the ReAct Agent instance, configure the chat model, prompt and the output schema of result to be used. + +{{< tabs "Create the ReAct Agent" >}} + +{{< tab "Python" >}} ```python review_analysis_react_agent = ReActAgent( chat_model=ResourceDescriptor( @@ -64,9 +98,35 @@ review_analysis_react_agent = ReActAgent( output_schema=ProductReviewAnalysisRes, ) ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Create ReAct agent. +ReActAgent reviewAnalysisReactAgent = getReActAgent(); + +private static ReActAgent getReActAgent() { + return new ReActAgent( + ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", "qwen3:8b") + .addInitialArgument( + "tools", Collections.singletonList("notifyShippingManager")) + .build(), + reviewAnalysisReactPrompt(), + CustomTypesAndResources.ProductReviewAnalysisRes.class); +} +``` +{{< /tab >}} + +{{< /tabs >}} ### Integrate the ReAct Agent with Flink Produce a source DataStream by reading a product review text file, and use the ReAct Agent to analyze the review and generate result DataStream. Finally print the result DataStream. + +{{< tabs "Integrate the ReAct Agent with Flink" >}} + +{{< tab "Python" >}} ```python # Read product reviews from a text file as a streaming source. # Each line in the file should be a JSON string representing a ProductReview. @@ -98,6 +158,54 @@ review_analysis_res_stream = ( # Print the analysis results to stdout. review_analysis_res_stream.print() ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Read product reviews from input_data.txt file as a streaming source. +// Each element represents a ProductReview. +DataStream<Row> productReviewStream = + env.fromSource( + FileSource.forRecordStreamFormat( + new TextLineFormat(), + new Path( + Objects.requireNonNull( + ReActAgentExample.class + .getClassLoader() + .getResource( + "input_data.txt")) + .getPath())) + .monitorContinuously(Duration.ofMinutes(1)) + .build(), + WatermarkStrategy.noWatermarks(), + "react-agent-example") + .map( + inputStr -> { + Row row = Row.withNames(); + CustomTypesAndResources.ProductReview productReview = + MAPPER.readValue( + inputStr, + CustomTypesAndResources.ProductReview.class); + row.setField("id", productReview.getId()); + row.setField("review", productReview.getReview()); + return row; + }); + + +// Use the ReAct agent to analyze each product review and +// record shipping question. +DataStream<Object> reviewAnalysisResStream = + agentsEnv + .fromDataStream(productReviewStream) + .apply(reviewAnalysisReactAgent) + .toDataStream(); + +// Print the analysis results to stdout. +reviewAnalysisResStream.print(); +``` +{{< /tab >}} + +{{< /tabs >}} ## Run the Example @@ -130,18 +238,45 @@ We recommend creating a Python virtual environment to install the Flink Agents P Follow the [installation]({{< ref "docs/get-started/installation" >}}) instructions to install the Flink Agents Python and Java libraries. +#### Clone the Flink Agents repo + +Clone the Flink Agents repo to get quickstart example code. +```bash +git clone https://github.com/apache/flink-agents.git +``` + #### Deploy a Standalone Flink Cluster You can deploy a standalone Flink cluster in your local environment with the following command. +{{< tabs "Deploy a Standalone Flink Cluster" >}} + +{{< tab "Python" >}} ```bash export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') ./flink-1.20.3/bin/start-cluster.sh ``` +{{< /tab >}} + +{{< tab "Java" >}} +1. Build Flink Agents from source to generate example jar. See [installation]({{< ref "docs/get-started/installation" >}}) for more details. +2. Copy the Flink Agents example jar to Flink lib directory + ```bash + cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar ./flink-1.20.3/lib/ + ``` +3. Start the Flink cluster + ```bash + ./flink-1.20.3/bin/start-cluster.sh + ``` +{{< /tab >}} + +{{< /tabs >}} You can refer to the [local cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster) instructions for more detailed step. -> **NOTE:** If you can't navigate to the web UI at [localhost:8081](localhost:8081), you can find the reason in `./flink-1.20.3/log`. If the reason is port conflict, you can change the port in `./flink-1.20.3/conf/config.yaml`. +{{< hint info >}} +If you can't navigate to the web UI at [localhost:8081](localhost:8081), you can find the reason in `./flink-1.20.3/log`. If the reason is port conflict, you can change the port in `./flink-1.20.3/conf/config.yaml`. +{{< /hint >}} #### Prepare Ollama @@ -155,21 +290,29 @@ ollama run qwen3:8b ### Submit Flink Agents Job to Standalone Flink Cluster -#### Clone the Flink Agents repo +#### Submit to Flink Cluster -```bash -git clone https://github.com/apache/flink-agents.git -``` +{{< tabs "Submit to Flink Cluster" >}} -#### Submit to Flink Cluster +{{< tab "Python" >}} ```bash export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') # Run review analysis example ./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/react_agent_example.py ``` +{{< /tab >}} + +{{< tab "Java" >}} +```bash +./flink-1.20.3/bin/flink run -c org.apache.flink.agents.examples.ReActAgentExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar +``` +{{< /tab >}} + +{{< /tabs >}} -Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081) +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081]( +localhost:8081) After a few minutes, you can check for the output in the TaskManager output log. diff --git a/docs/content/docs/get-started/quickstart/workflow_agent.md b/docs/content/docs/get-started/quickstart/workflow_agent.md index 76b3c63..e58dbbe 100644 --- a/docs/content/docs/get-started/quickstart/workflow_agent.md +++ b/docs/content/docs/get-started/quickstart/workflow_agent.md @@ -40,6 +40,9 @@ Together, these examples show how to build a multi-agent workflow with Flink Age Create the agents execution environment, and register the available chat model connections, which can be used by the agents, to the environment. +{{< tabs "Prepare Agents Execution Environment" >}} + +{{< tab "Python" >}} ```python # Set up the Flink streaming environment and the Agents execution environment. env = StreamExecutionEnvironment.get_execution_environment() @@ -52,11 +55,32 @@ agents_env.add_resource( ollama_server_descriptor, ) ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Add Ollama chat model connection to be used by the ReviewAnalysisAgent. +agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR); +``` +{{< /tab >}} + +{{< /tabs >}} ### Create the Agents Below is the example code for the `ReviewAnalysisAgent`, which is used to analyze the product reviews and generate a satisfaction score and potential reasons for dissatisfaction. It demonstrates how to define the prompt, tool, chat model, and action for the agent. Also, it shows how to process the chat response and send the output event. For more details, please refer to the [Workflow Agent]({{< ref "docs/development/workflow_agent" >}}) documentation. +{{< tabs "Create the Agents" >}} + +{{< tab "Python" >}} ```python class ReviewAnalysisAgent(Agent): """An agent that uses a large language model (LLM) to analyze product reviews @@ -139,6 +163,100 @@ class ReviewAnalysisAgent(Agent): # To fail the agent, you can raise an exception here. ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +/** + * An agent that uses a large language model (LLM) to analyze product reviews and generate a + * satisfaction score and potential reasons for dissatisfaction. + * + * <p>This agent receives a product review and produces a satisfaction score and a list of reasons + * for dissatisfaction. It handles prompt construction, LLM interaction, and output parsing. + */ +public class ReviewAnalysisAgent extends Agent { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Prompt + public static org.apache.flink.agents.api.prompt.Prompt reviewAnalysisPrompt() { + return REVIEW_ANALYSIS_PROMPT; + } + + @ChatModelSetup + public static ResourceDescriptor reviewAnalysisModel() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", "qwen3:8b") + .addInitialArgument("prompt", "reviewAnalysisPrompt") + .addInitialArgument("tools", Collections.singletonList("notifyShippingManager")) + .addInitialArgument("extract_reasoning", "true") + .build(); + } + + /** + * Tool for notifying the shipping manager when product received a negative review due to + * shipping damage. + * + * @param id The id of the product that received a negative review due to shipping damage + * @param review The negative review content + */ + @Tool( + description = + "Notify the shipping manager when product received a negative review due to shipping damage.") + public static void notifyShippingManager( + @ToolParam(name = "id") String id, @ToolParam(name = "review") String review) { + CustomTypesAndResources.notifyShippingManager(id, review); + } + + /** Process input event and send chat request for review analysis. */ + @Action(listenEvents = {InputEvent.class}) + public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { + String input = (String) event.getInput(); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + CustomTypesAndResources.ProductReview inputObj = + MAPPER.readValue(input, CustomTypesAndResources.ProductReview.class); + + ctx.getShortTermMemory().set("id", inputObj.getId()); + + String content = + String.format( + "{\n" + "\"id\": %s,\n" + "\"review\": \"%s\"\n" + "}", + inputObj.getId(), inputObj.getReview()); + ChatMessage msg = new ChatMessage(MessageRole.USER, "", Map.of("input", content)); + + ctx.sendEvent(new ChatRequestEvent("reviewAnalysisModel", List.of(msg))); + } + + @Action(listenEvents = ChatResponseEvent.class) + public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx) + throws Exception { + JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent()); + JsonNode scoreNode = jsonNode.findValue("score"); + JsonNode reasonsNode = jsonNode.findValue("reasons"); + if (scoreNode == null || reasonsNode == null) { + throw new IllegalStateException( + "Invalid response from LLM: missing 'score' or 'reasons' field."); + } + List<String> result = new ArrayList<>(); + if (reasonsNode.isArray()) { + for (JsonNode node : reasonsNode) { + result.add(node.asText()); + } + } + + ctx.sendEvent( + new OutputEvent( + new CustomTypesAndResources.ProductReviewAnalysisRes( + ctx.getShortTermMemory().get("id").getValue().toString(), + scoreNode.asInt(), + result))); + } +} +``` +{{< /tab >}} + +{{< /tabs >}} The code for the `ProductSuggestionAgent`, which is used to generate product improvement suggestions based on the aggregated analysis results, is similar to the `ReviewAnalysisAgent`. @@ -146,6 +264,9 @@ The code for the `ProductSuggestionAgent`, which is used to generate product imp Create the input DataStream by reading the product reviews from a text file as a streaming source, and use the `ReviewAnalysisAgent` to analyze the product reviews and generate the result DataStream. Finally print the result DataStream. +{{< tabs "Integrate the Agents with Flink" >}} + +{{< tab "Python" >}} ```python # Read product reviews from a text file as a streaming source. # Each line in the file should be a JSON string representing a ProductReview. @@ -178,8 +299,40 @@ review_analysis_res_stream.print() # Execute the Flink pipeline. agents_env.execute() ``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Read product reviews from input_data.txt file as a streaming source. +// Each element represents a ProductReview. +DataStream<String> productReviewStream = + env.fromSource( + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), + new Path(inputDataFile.getAbsolutePath())) + .build(), + WatermarkStrategy.noWatermarks(), + "streaming-agent-example"); + +// Use the ReviewAnalysisAgent to analyze each product review. +DataStream<Object> reviewAnalysisResStream = + agentsEnv + .fromDataStream(productReviewStream) + .apply(new ReviewAnalysisAgent()) + .toDataStream(); + +// Print the analysis results to stdout. +reviewAnalysisResStream.print(); + +// Execute the Flink pipeline. +agentsEnv.execute(); +``` +{{< /tab >}} + +{{< /tabs >}} + +## Run the Example -## Run the Examples ### Prerequisites * Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL) @@ -205,40 +358,67 @@ You can refer to the [local installation](https://nightlies.apache.org/flink/fli #### Prepare Flink Agents -We recommand creating a Python virtual environment to install the Flink Agents Python library. +We recommend creating a Python virtual environment to install the Flink Agents Python library. Follow the [installation]({{< ref "docs/get-started/installation" >}}) instructions to install the Flink Agents Python and Java libraries. -#### Prepare Ollama - -Download and install Ollama from the official [website](https://ollama.com/download). - -Then pull the qwen3:8b model, which is required by the quickstart examples +#### Clone the Flink Agents repo +Clone the Flink Agents repo to get quickstart example code. ```bash -ollama pull qwen3:8b +git clone https://github.com/apache/flink-agents.git ``` #### Deploy a Standalone Flink Cluster You can deploy a standalone Flink cluster in your local environment with the following command. +{{< tabs "Deploy a Standalone Flink Cluster" >}} + +{{< tab "Python" >}} ```bash export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') ./flink-1.20.3/bin/start-cluster.sh ``` +{{< /tab >}} -You should be able to navigate to the web UI at [localhost:8081](localhost:8081) to view the Flink dashboard and see that the cluster is up and running. +{{< tab "Java" >}} +1. Build Flink Agents from source to generate example jar. See [installation]({{< ref "docs/get-started/installation" >}}) for more details. +2. Copy the Flink Agents example jar to Flink lib directory + ```bash + cp flink-agents/examples/target/flink-agents-examples-$VERSION.jar ./flink-1.20.3/lib/ + ``` +3. Start the Flink cluster + ```bash + ./flink-1.20.3/bin/start-cluster.sh + ``` +{{< /tab >}} -### Submit Flink Agents Job to Standalone Flink Cluster +{{< /tabs >}} +You can refer to the [local cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster) instructions for more detailed step. -#### Clone the Flink Agents repo + +{{< hint info >}} +If you can't navigate to the web UI at [localhost:8081](localhost:8081), you can find the reason in `./flink-1.20.3/log`. If the reason is port conflict, you can change the port in `./flink-1.20.3/conf/config.yaml`. +{{< /hint >}} + +#### Prepare Ollama + +Download and install Ollama from the official [website](https://ollama.com/download). + +Then run the qwen3:8b model, which is required by the quickstart examples ```bash -git clone https://github.com/apache/flink-agents.git +ollama run qwen3:8b ``` +### Submit Flink Agents Job to Standalone Flink Cluster + #### Submit to Flink Cluster + +{{< tabs "Submit to Flink Cluster" >}} + +{{< tab "Python" >}} ```bash export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') @@ -248,7 +428,21 @@ export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["pu # Run product suggestion example ./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py ``` +{{< /tab >}} + +{{< tab "Java" >}} +```bash +# Run review analysis example +./flink-1.20.3/bin/flink run -c org.apache.flink.agents.examples.WorkflowSingleAgentExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar + +# Run product suggestion example +./flink-1.20.3/bin/flink run -c org.apache.flink.agents.examples.WorkflowMultipleAgentExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar +``` +{{< /tab >}} + +{{< /tabs >}} -Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081) +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081]( +localhost:8081) After a few minutes, you can check for the output in the TaskManager output log. diff --git a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java index 2033df6..e007646 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/JavaFunction.java @@ -49,6 +49,7 @@ public class JavaFunction implements Function { this.qualName = qualName; this.methodName = methodName; this.parameterTypes = parameterTypes; + // TODO: support get method loaded by user code classloader. this.method = Class.forName(qualName).getMethod(methodName, parameterTypes); }
