This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new b03e69ad [examples][java] Add Table API example for product review 
analysis (#530)
b03e69ad is described below

commit b03e69ad65e3115c4130fe24b300d08200d63245
Author: Weiqing Yang <[email protected]>
AuthorDate: Tue Feb 10 19:56:38 2026 -0800

    [examples][java] Add Table API example for product review analysis (#530)
---
 .../examples/WorkflowMultipleAgentExample.java     |  67 +++++----
 .../examples/agents/TableReviewAnalysisAgent.java  | 151 +++++++++++++++++++++
 2 files changed, 190 insertions(+), 28 deletions(-)

diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
index 874fa9e5..1e56bc09 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
@@ -25,16 +25,17 @@ import 
org.apache.flink.agents.api.agents.AgentExecutionOptions;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
 import org.apache.flink.agents.examples.agents.ProductSuggestionAgent;
-import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.connector.file.src.FileSource;
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.agents.examples.agents.TableReviewAnalysisAgent;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.util.Collector;
 
 import java.io.File;
@@ -52,8 +53,9 @@ import static 
org.apache.flink.agents.examples.agents.CustomTypesAndResources.Pr
  * <p>This example demonstrates a multi-stage streaming pipeline using Flink 
Agents:
  *
  * <ol>
- *   <li>Reads product reviews from a source as a streaming source.
- *   <li>Uses an LLM agent to analyze each review and extract score and 
unsatisfied reasons.
+ *   <li>Reads product reviews from a JSON file using Flink Table API.
+ *   <li>Uses an LLM agent (via {@link TableReviewAnalysisAgent}) to analyze 
each review and extract
+ *       score and unsatisfied reasons.
  *   <li>Aggregates the analysis results in 1-minute tumbling windows, 
producing score distributions
  *       and collecting all unsatisfied reasons.
  *   <li>Uses another LLM agent to generate product improvement suggestions 
based on the aggregated
@@ -124,46 +126,56 @@ public class WorkflowMultipleAgentExample {
 
     /** Runs the example pipeline. */
     public static void main(String[] args) throws Exception {
-        // Set up the Flink streaming environment and the Agents execution 
environment.
+        // Set up the Flink streaming environment.
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+
+        // Create StreamTableEnvironment for Table API support.
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+        // Create AgentsExecutionEnvironment with both env and tableEnv to 
enable
+        // Table API integration.
         AgentsExecutionEnvironment agentsEnv =
-                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+                AgentsExecutionEnvironment.getExecutionEnvironment(env, 
tableEnv);
 
         // limit async request to avoid overwhelming ollama server
         agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2);
 
-        // Add Ollama chat model connection to be used by the 
ReviewAnalysisAgent
+        // Add Ollama chat model connection to be used by the 
TableReviewAnalysisAgent
         // and ProductSuggestionAgent.
         agentsEnv.addResource(
                 "ollamaChatModelConnection",
                 ResourceType.CHAT_MODEL_CONNECTION,
                 CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
 
-        // Read product reviews from input_data.txt file as a streaming source.
-        // Each element represents a ProductReview.
+        // Read product reviews from input_data.txt file using Flink Table API.
+        // Each line in the file is a JSON string representing a product 
review.
         File inputDataFile = copyResource("input_data.txt");
-        DataStream<String> productReviewStream =
-                env.fromSource(
-                        FileSource.forRecordStreamFormat(
-                                        new TextLineInputFormat(),
-                                        new 
Path(inputDataFile.getAbsolutePath()))
-                                .build(),
-                        WatermarkStrategy.noWatermarks(),
-                        "streaming-agent-example");
-
-        // Use the ReviewAnalysisAgent (LLM) to analyze each review.
+        tableEnv.createTemporaryTable(
+                "product_reviews",
+                TableDescriptor.forConnector("filesystem")
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("id", DataTypes.STRING())
+                                        .column("review", DataTypes.STRING())
+                                        .build())
+                        .option("format", "json")
+                        .option("path", inputDataFile.getAbsolutePath())
+                        .build());
+
+        Table inputTable = tableEnv.from("product_reviews");
+
+        // Use the TableReviewAnalysisAgent (LLM) to analyze each review.
         // The agent extracts the review score and unsatisfied reasons.
         DataStream<Object> reviewAnalysisResStream =
                 agentsEnv
-                        .fromDataStream(productReviewStream)
-                        .apply(new ReviewAnalysisAgent())
+                        .fromTable(inputTable, new 
TableReviewAnalysisAgent.RowKeySelector())
+                        .apply(new TableReviewAnalysisAgent())
                         .toDataStream();
 
         // Aggregate the analysis results in 1-minute tumbling windows.
         // This produces a score distribution and collects all unsatisfied 
reasons for
-        // each
-        // product.
+        // each product.
         DataStream<String> aggregatedAnalysisResStream =
                 reviewAnalysisResStream
                         .map(element -> (ProductReviewAnalysisRes) element)
@@ -172,8 +184,7 @@ public class WorkflowMultipleAgentExample {
                         .process(new 
AggregateScoreDistributionAndDislikeReasons());
 
         // Use the ProductSuggestionAgent (LLM) to generate product improvement
-        // suggestions
-        // based on the aggregated analysis results.
+        // suggestions based on the aggregated analysis results.
         DataStream<Object> productSuggestionResStream =
                 agentsEnv
                         .fromDataStream(aggregatedAnalysisResStream)
diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
new file mode 100644
index 00000000..9b394b75
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Prompt;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.annotation.ToolParam;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceName;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.examples.agents.CustomTypesAndResources.REVIEW_ANALYSIS_PROMPT;
+
+/**
+ * An agent that analyzes product reviews from Flink Table API input.
+ *
+ * <p>This agent is designed to work with the Flink Table API. It receives 
input as {@link Row}
+ * objects (when using {@code fromTable()}) and produces analysis results 
including satisfaction
+ * score and reasons for dissatisfaction.
+ *
+ * <p>The main difference from {@link ReviewAnalysisAgent} is that this agent 
handles {@link Row}
+ * input instead of JSON strings.
+ */
+public class TableReviewAnalysisAgent extends Agent {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    /** Key selector for extracting keys from Row objects used with Table API 
input. */
+    public static class RowKeySelector implements KeySelector<Object, String> {
+        @Override
+        public String getKey(Object value) {
+            if (value instanceof Row) {
+                Row row = (Row) value;
+                return (String) row.getField("id");
+            }
+            return "";
+        }
+    }
+
+    @Prompt
+    public static org.apache.flink.agents.api.prompt.Prompt 
reviewAnalysisPrompt() {
+        return REVIEW_ANALYSIS_PROMPT;
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor reviewAnalysisModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP)
+                .addInitialArgument("connection", "ollamaChatModelConnection")
+                .addInitialArgument("model", "qwen3:8b")
+                .addInitialArgument("prompt", "reviewAnalysisPrompt")
+                .addInitialArgument("tools", 
Collections.singletonList("notifyShippingManager"))
+                .addInitialArgument("extract_reasoning", true)
+                .build();
+    }
+
+    /**
+     * Tool for notifying the shipping manager when product received a 
negative review due to
+     * shipping damage.
+     *
+     * @param id The id of the product that received a negative review due to 
shipping damage
+     * @param review The negative review content
+     */
+    @Tool(
+            description =
+                    "Notify the shipping manager when product received a 
negative review due to shipping damage.")
+    public static void notifyShippingManager(
+            @ToolParam(name = "id") String id, @ToolParam(name = "review") 
String review) {
+        CustomTypesAndResources.notifyShippingManager(id, review);
+    }
+
+    /**
+     * Process input event from Table data (Row format).
+     *
+     * <p>When using {@code fromTable()}, the input is a {@link Row} with 
fields matching the table
+     * column names.
+     */
+    @Action(listenEvents = {InputEvent.class})
+    public static void processInput(InputEvent event, RunnerContext ctx) 
throws Exception {
+        Row row = (Row) event.getInput();
+        String productId = (String) row.getField("id");
+        String reviewText = (String) row.getField("review");
+
+        ctx.getShortTermMemory().set("id", productId);
+
+        String content =
+                String.format(
+                        "{\n" + "\"id\": \"%s\",\n" + "\"review\": \"%s\"\n" + 
"}",
+                        productId, reviewText);
+        ChatMessage msg = new ChatMessage(MessageRole.USER, "", 
Map.of("input", content));
+
+        ctx.sendEvent(new ChatRequestEvent("reviewAnalysisModel", 
List.of(msg)));
+    }
+
+    @Action(listenEvents = ChatResponseEvent.class)
+    public static void processChatResponse(ChatResponseEvent event, 
RunnerContext ctx)
+            throws Exception {
+        JsonNode jsonNode = MAPPER.readTree(event.getResponse().getContent());
+        JsonNode scoreNode = jsonNode.findValue("score");
+        JsonNode reasonsNode = jsonNode.findValue("reasons");
+        if (scoreNode == null || reasonsNode == null) {
+            throw new IllegalStateException(
+                    "Invalid response from LLM: missing 'score' or 'reasons' 
field.");
+        }
+        List<String> result = new ArrayList<>();
+        if (reasonsNode.isArray()) {
+            for (JsonNode node : reasonsNode) {
+                result.add(node.asText());
+            }
+        }
+
+        ctx.sendEvent(
+                new OutputEvent(
+                        new CustomTypesAndResources.ProductReviewAnalysisRes(
+                                
ctx.getShortTermMemory().get("id").getValue().toString(),
+                                scoreNode.asInt(),
+                                result)));
+    }
+}

Reply via email to