This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 554928e2 [example] Use Table API in workflow_multiple_agent_example 
(#504)
554928e2 is described below

commit 554928e23a9a3ef25debf9e7a0c4a7f67e1bc08d
Author: Xuannan <[email protected]>
AuthorDate: Thu Jan 29 15:57:51 2026 +0800

    [example] Use Table API in workflow_multiple_agent_example (#504)
---
 .../agents/table_review_analysis_agent.py          | 153 +++++++++++++++++++++
 .../quickstart/workflow_multiple_agent_example.py  |  93 ++++++-------
 2 files changed, 194 insertions(+), 52 deletions(-)

diff --git 
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py 
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
new file mode 100644
index 00000000..7f6b1829
--- /dev/null
+++ 
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
@@ -0,0 +1,153 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import json
+import logging
+from typing import Any
+
+from pyflink.datastream import KeySelector
+
+from flink_agents.api.agents.agent import Agent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import (
+    action,
+    chat_model_setup,
+    prompt,
+    tool,
+)
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceDescriptor, ResourceName
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import 
(
+    ProductReviewAnalysisRes,
+    notify_shipping_manager,
+    review_analysis_prompt,
+)
+
+
+class TableKeySelector(KeySelector):
+    """KeySelector for extracting key from dictionary input (Table data)."""
+
+    def get_key(self, value: Any) -> str:
+        """Extract key from dictionary.
+
+        Parameters
+        ----------
+        value : Any
+            The input value, expected to be a dictionary with 'id' key.
+
+        Returns:
+        -------
+        str
+            The id value as the key.
+        """
+        if isinstance(value, dict):
+            return str(value["id"])
+        # Fallback for other types (e.g., Row)
+        return str(value["id"]) if hasattr(value, "__getitem__") else 
str(value.id)
+
+
+class TableReviewAnalysisAgent(Agent):
+    """An agent that analyzes product reviews from Flink Table input.
+
+    This agent is designed to work with Flink Table API. It receives input as
+    dictionary (when using from_table()) and produces analysis results 
including
+    satisfaction score and reasons for dissatisfaction.
+
+    The main difference from ReviewAnalysisAgent is that this agent handles
+    dictionary input instead of Pydantic objects.
+    """
+
+    @prompt
+    @staticmethod
+    def review_analysis_prompt() -> Prompt:
+        """Prompt for review analysis."""
+        return review_analysis_prompt
+
+    @tool
+    @staticmethod
+    def notify_shipping_manager(id: str, review: str) -> None:
+        """Notify the shipping manager when product received a negative review 
due to
+        shipping damage.
+
+        Parameters
+        ----------
+        id : str
+            The id of the product that received a negative review due to 
shipping damage
+        review: str
+            The negative review content
+        """
+        notify_shipping_manager(id=id, review=review)
+
+    @chat_model_setup
+    @staticmethod
+    def review_analysis_model() -> ResourceDescriptor:
+        """ChatModel which focus on review analysis."""
+        return ResourceDescriptor(
+            clazz=ResourceName.ChatModel.OLLAMA_SETUP,
+            connection="ollama_server",
+            model="qwen3:8b",
+            prompt="review_analysis_prompt",
+            tools=["notify_shipping_manager"],
+            extract_reasoning=True,
+        )
+
+    @action(InputEvent)
+    @staticmethod
+    def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+        """Process input event from Table data (dictionary format).
+
+        When using from_table(), the input is a dictionary with keys matching
+        the table column names.
+        """
+        # Table input is dictionary format: {"id": "xxx", "review": "xxx", 
"ts": 123}
+        input_dict = event.input
+        product_id = str(input_dict["id"])
+        review_text = str(input_dict["review"])
+
+        ctx.short_term_memory.set("id", product_id)
+
+        content = f"""
+            "id": {product_id},
+            "review": {review_text}
+        """
+        msg = ChatMessage(role=MessageRole.USER, extra_args={"input": content})
+        ctx.send_event(ChatRequestEvent(model="review_analysis_model", 
messages=[msg]))
+
+    @action(ChatResponseEvent)
+    @staticmethod
+    def process_chat_response(event: ChatResponseEvent, ctx: RunnerContext) -> 
None:
+        """Process chat response event and send output event."""
+        try:
+            json_content = json.loads(event.response.content)
+            ctx.send_event(
+                OutputEvent(
+                    output=ProductReviewAnalysisRes(
+                        id=ctx.short_term_memory.get("id"),
+                        score=json_content["score"],
+                        reasons=json_content["reasons"],
+                    )
+                )
+            )
+        except Exception:
+            logging.exception(
+                f"Error processing chat response {event.response.content}"
+            )
+            # To fail the agent, you can raise an exception here.
diff --git 
a/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py 
b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
index c133008f..81350285 100644
--- a/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
+++ b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
@@ -15,26 +15,21 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-import json
-import logging
 from pathlib import Path
 from typing import Iterable
 
-from pyflink.common import Configuration, Duration, Time, WatermarkStrategy
-from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.common import Configuration, Time
 from pyflink.datastream import (
     ProcessWindowFunction,
     StreamExecutionEnvironment,
 )
-from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
-from pyflink.datastream.window import (
-    TumblingProcessingTimeWindows,
-)
+from pyflink.datastream.window import TumblingProcessingTimeWindows
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment, 
TableDescriptor
+from pyflink.table.expressions import col
 
 from flink_agents.api.execution_environment import AgentsExecutionEnvironment
 from flink_agents.api.resource import ResourceType
 from flink_agents.examples.quickstart.agents.custom_types_and_resources import 
(
-    ProductReview,
     ProductReviewSummary,
     ollama_server_descriptor,
 )
@@ -43,25 +38,15 @@ from 
flink_agents.examples.quickstart.agents.product_suggestion_agent import (
 )
 from flink_agents.examples.quickstart.agents.review_analysis_agent import (
     ProductReviewAnalysisRes,
-    ReviewAnalysisAgent,
+)
+from flink_agents.examples.quickstart.agents.table_review_analysis_agent 
import (
+    TableKeySelector,
+    TableReviewAnalysisAgent,
 )
 
 current_dir = Path(__file__).parent
 
 
-class MyTimestampAssigner(TimestampAssigner):
-    """Assign timestamp to each element."""
-
-    def extract_timestamp(self, element: str, record_timestamp: int) -> int:
-        """Extract timestamp from JSON string."""
-        try:
-            json_element = json.loads(element)
-            return json_element["ts"] * 1000
-        except json.JSONDecodeError:
-            logging.exception(f"Error decoding JSON: {element}")
-            return record_timestamp
-
-
 class AggregateScoreDistributionAndDislikeReasons(ProcessWindowFunction):
     """Aggregate score distribution and dislike reasons."""
 
@@ -95,7 +80,7 @@ def main() -> None:
     """Main function for the product improvement suggestion quickstart example.
 
     This example demonstrates a multi-stage streaming pipeline using Flink 
Agents:
-      1. Reads product reviews from a text file as a streaming source.
+      1. Reads product reviews from a JSON file using Flink Table API.
       2. Uses an LLM agent to analyze each review and extract score and 
unsatisfied
          reasons.
       3. Aggregates the analysis results in 1-minute tumbling windows, 
producing score
@@ -110,7 +95,15 @@ def main() -> None:
     config.set_string("python.fn-execution.bundle.size", "1")
     env = StreamExecutionEnvironment.get_execution_environment(config)
     env.set_parallelism(1)
-    agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+
+    # Create StreamTableEnvironment for Table API support.
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    # Create AgentsExecutionEnvironment with both env and t_env to enable
+    # Table API integration.
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(
+        env=env, t_env=t_env
+    )
 
     # Add Ollama chat model connection to be used by the ReviewAnalysisAgent
     # and ProductSuggestionAgent.
@@ -120,39 +113,35 @@ def main() -> None:
         ollama_server_descriptor,
     )
 
-    # Read product reviews from a text file as a streaming source.
+    # Read product reviews from a JSON file using Flink Table API.
     # Each line in the file should be a JSON string representing a 
ProductReview.
-    product_review_stream = (
-        env.from_source(
-            source=FileSource.for_record_stream_format(
-                StreamFormat.text_line_format(),
-                f"file:///{current_dir}/resources/product_review.txt",
-            )
-            .monitor_continuously(Duration.of_minutes(1))
-            .build(),
-            watermark_strategy=WatermarkStrategy.no_watermarks(),
-            source_name="streaming_agent_example",
-        )
-        .set_parallelism(1)
-        .assign_timestamps_and_watermarks(
-            
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
-                MyTimestampAssigner()
-            )
-        )
-        .map(
-            lambda x: ProductReview.model_validate_json(
-                x
-            )  # Deserialize JSON to ProductReview.
+    # Define the source table with watermark based on the 'ts' column.
+    t_env.create_temporary_table(
+        "product_reviews",
+        TableDescriptor.for_connector("filesystem")
+        .schema(
+            Schema.new_builder()
+            .column("id", DataTypes.STRING())
+            .column("review", DataTypes.STRING())
+            .column("ts", DataTypes.BIGINT())
+            .column_by_expression("rowtime", "TO_TIMESTAMP_LTZ(`ts` * 1000, 
3)")
+            .watermark("rowtime", "rowtime - INTERVAL '0' SECOND")
+            .build()
         )
+        .option("format", "json")
+        .option("path", f"file:///{current_dir}/resources/product_review.txt")
+        .build(),
     )
 
-    # Use the ReviewAnalysisAgent (LLM) to analyze each review.
+    input_table = t_env.from_path("product_reviews").select(
+        col("id"), col("review"), col("ts")
+    )
+
+    # Use the TableReviewAnalysisAgent (LLM) to analyze each review.
     # The agent extracts the review score and unsatisfied reasons.
     review_analysis_res_stream = (
-        agents_env.from_datastream(
-            input=product_review_stream, key_selector=lambda x: x.id
-        )
-        .apply(ReviewAnalysisAgent())
+        agents_env.from_table(input=input_table, 
key_selector=TableKeySelector())
+        .apply(TableReviewAnalysisAgent())
         .to_datastream()
     )
 

Reply via email to