This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new b03e69ad [examples][java] Add Table API example for product review
analysis (#530)
b03e69ad is described below
commit b03e69ad65e3115c4130fe24b300d08200d63245
Author: Weiqing Yang <[email protected]>
AuthorDate: Tue Feb 10 19:56:38 2026 -0800
[examples][java] Add Table API example for product review analysis (#530)
---
.../examples/WorkflowMultipleAgentExample.java | 67 +++++----
.../examples/agents/TableReviewAnalysisAgent.java | 151 +++++++++++++++++++++
2 files changed, 190 insertions(+), 28 deletions(-)
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
index 874fa9e5..1e56bc09 100644
---
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
+++
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
@@ -25,16 +25,17 @@ import
org.apache.flink.agents.api.agents.AgentExecutionOptions;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
import org.apache.flink.agents.examples.agents.ProductSuggestionAgent;
-import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.connector.file.src.FileSource;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.agents.examples.agents.TableReviewAnalysisAgent;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import java.io.File;
@@ -52,8 +53,9 @@ import static
org.apache.flink.agents.examples.agents.CustomTypesAndResources.Pr
* <p>This example demonstrates a multi-stage streaming pipeline using Flink
Agents:
*
* <ol>
- * <li>Reads product reviews from a source as a streaming source.
- * <li>Uses an LLM agent to analyze each review and extract score and
unsatisfied reasons.
+ * <li>Reads product reviews from a JSON file using Flink Table API.
+ * <li>Uses an LLM agent (via {@link TableReviewAnalysisAgent}) to analyze
each review and extract
+ * score and unsatisfied reasons.
* <li>Aggregates the analysis results in 1-minute tumbling windows,
producing score distributions
* and collecting all unsatisfied reasons.
* <li>Uses another LLM agent to generate product improvement suggestions
based on the aggregated
@@ -124,46 +126,56 @@ public class WorkflowMultipleAgentExample {
/** Runs the example pipeline. */
public static void main(String[] args) throws Exception {
- // Set up the Flink streaming environment and the Agents execution
environment.
+ // Set up the Flink streaming environment.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
+
+ // Create StreamTableEnvironment for Table API support.
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+ // Create AgentsExecutionEnvironment with both env and tableEnv to
enable
+ // Table API integration.
AgentsExecutionEnvironment agentsEnv =
- AgentsExecutionEnvironment.getExecutionEnvironment(env);
+ AgentsExecutionEnvironment.getExecutionEnvironment(env,
tableEnv);
// limit async request to avoid overwhelming ollama server
agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2);
- // Add Ollama chat model connection to be used by the
ReviewAnalysisAgent
+ // Add Ollama chat model connection to be used by the
TableReviewAnalysisAgent
// and ProductSuggestionAgent.
agentsEnv.addResource(
"ollamaChatModelConnection",
ResourceType.CHAT_MODEL_CONNECTION,
CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
- // Read product reviews from input_data.txt file as a streaming source.
- // Each element represents a ProductReview.
+ // Read product reviews from input_data.txt file using Flink Table API.
+ // Each line in the file is a JSON string representing a product
review.
File inputDataFile = copyResource("input_data.txt");
- DataStream<String> productReviewStream =
- env.fromSource(
- FileSource.forRecordStreamFormat(
- new TextLineInputFormat(),
- new
Path(inputDataFile.getAbsolutePath()))
- .build(),
- WatermarkStrategy.noWatermarks(),
- "streaming-agent-example");
-
- // Use the ReviewAnalysisAgent (LLM) to analyze each review.
+ tableEnv.createTemporaryTable(
+ "product_reviews",
+ TableDescriptor.forConnector("filesystem")
+ .schema(
+ Schema.newBuilder()
+ .column("id", DataTypes.STRING())
+ .column("review", DataTypes.STRING())
+ .build())
+ .option("format", "json")
+ .option("path", inputDataFile.getAbsolutePath())
+ .build());
+
+ Table inputTable = tableEnv.from("product_reviews");
+
+ // Use the TableReviewAnalysisAgent (LLM) to analyze each review.
// The agent extracts the review score and unsatisfied reasons.
DataStream<Object> reviewAnalysisResStream =
agentsEnv
- .fromDataStream(productReviewStream)
- .apply(new ReviewAnalysisAgent())
+ .fromTable(inputTable, new
TableReviewAnalysisAgent.RowKeySelector())
+ .apply(new TableReviewAnalysisAgent())
.toDataStream();
// Aggregate the analysis results in 1-minute tumbling windows.
// This produces a score distribution and collects all unsatisfied
reasons for
- // each
- // product.
+ // each product.
DataStream<String> aggregatedAnalysisResStream =
reviewAnalysisResStream
.map(element -> (ProductReviewAnalysisRes) element)
@@ -172,8 +184,7 @@ public class WorkflowMultipleAgentExample {
.process(new
AggregateScoreDistributionAndDislikeReasons());
// Use the ProductSuggestionAgent (LLM) to generate product improvement
- // suggestions
- // based on the aggregated analysis results.
+ // suggestions based on the aggregated analysis results.
DataStream<Object> productSuggestionResStream =
agentsEnv
.fromDataStream(aggregatedAnalysisResStream)
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
new file mode 100644
index 00000000..9b394b75
--- /dev/null
+++
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Prompt;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.annotation.ToolParam;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceName;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.examples.agents.CustomTypesAndResources.REVIEW_ANALYSIS_PROMPT;
+
+/**
+ * An agent that analyzes product reviews from Flink Table API input.
+ *
+ * <p>This agent is designed to work with the Flink Table API. It receives
input as {@link Row}
+ * objects (when using {@code fromTable()}) and produces analysis results
including satisfaction
+ * score and reasons for dissatisfaction.
+ *
+ * <p>The main difference from {@link ReviewAnalysisAgent} is that this agent
handles {@link Row}
+ * input instead of JSON strings.
+ */
+public class TableReviewAnalysisAgent extends Agent {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /** Key selector for extracting keys from Row objects used with Table API
input. */
+ public static class RowKeySelector implements KeySelector<Object, String> {
+ @Override
+ public String getKey(Object value) {
+ if (value instanceof Row) {
+ Row row = (Row) value;
+ return (String) row.getField("id");
+ }
+ return "";
+ }
+ }
+
+ @Prompt
+ public static org.apache.flink.agents.api.prompt.Prompt
reviewAnalysisPrompt() {
+ return REVIEW_ANALYSIS_PROMPT;
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor reviewAnalysisModel() {
+ return
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP)
+ .addInitialArgument("connection", "ollamaChatModelConnection")
+ .addInitialArgument("model", "qwen3:8b")
+ .addInitialArgument("prompt", "reviewAnalysisPrompt")
+ .addInitialArgument("tools",
Collections.singletonList("notifyShippingManager"))
+ .addInitialArgument("extract_reasoning", true)
+ .build();
+ }
+
+ /**
+ * Tool for notifying the shipping manager when product received a
negative review due to
+ * shipping damage.
+ *
+ * @param id The id of the product that received a negative review due to
shipping damage
+ * @param review The negative review content
+ */
+ @Tool(
+ description =
+ "Notify the shipping manager when product received a
negative review due to shipping damage.")
+ public static void notifyShippingManager(
+ @ToolParam(name = "id") String id, @ToolParam(name = "review")
String review) {
+ CustomTypesAndResources.notifyShippingManager(id, review);
+ }
+
+ /**
+ * Process input event from Table data (Row format).
+ *
+ * <p>When using {@code fromTable()}, the input is a {@link Row} with
fields matching the table
+ * column names.
+ */
+ @Action(listenEvents = {InputEvent.class})
+ public static void processInput(InputEvent event, RunnerContext ctx)
throws Exception {
+ Row row = (Row) event.getInput();
+ String productId = (String) row.getField("id");
+ String reviewText = (String) row.getField("review");
+
+ ctx.getShortTermMemory().set("id", productId);
+
+ String content =
+ String.format(
+ "{\n" + "\"id\": \"%s\",\n" + "\"review\": \"%s\"\n" +
"}",
+ productId, reviewText);
+ ChatMessage msg = new ChatMessage(MessageRole.USER, "",
Map.of("input", content));
+
+ ctx.sendEvent(new ChatRequestEvent("reviewAnalysisModel",
List.of(msg)));
+ }
+
+ @Action(listenEvents = ChatResponseEvent.class)
+ public static void processChatResponse(ChatResponseEvent event,
RunnerContext ctx)
+ throws Exception {
+ JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+ JsonNode scoreNode = jsonNode.findValue("score");
+ JsonNode reasonsNode = jsonNode.findValue("reasons");
+ if (scoreNode == null || reasonsNode == null) {
+ throw new IllegalStateException(
+ "Invalid response from LLM: missing 'score' or 'reasons'
field.");
+ }
+ List<String> result = new ArrayList<>();
+ if (reasonsNode.isArray()) {
+ for (JsonNode node : reasonsNode) {
+ result.add(node.asText());
+ }
+ }
+
+ ctx.sendEvent(
+ new OutputEvent(
+ new CustomTypesAndResources.ProductReviewAnalysisRes(
+
ctx.getShortTermMemory().get("id").getValue().toString(),
+ scoreNode.asInt(),
+ result)));
+ }
+}