This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 6dc22c94 [example] Use Table API in workflow_multiple_agent_example
(#504)
6dc22c94 is described below
commit 6dc22c94e52aa03bce8065a41edfd5eb6ad27ad6
Author: Xuannan <[email protected]>
AuthorDate: Thu Jan 29 15:57:51 2026 +0800
[example] Use Table API in workflow_multiple_agent_example (#504)
---
.../agents/table_review_analysis_agent.py | 153 +++++++++++++++++++++
.../quickstart/workflow_multiple_agent_example.py | 93 ++++++-------
2 files changed, 194 insertions(+), 52 deletions(-)
diff --git
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
new file mode 100644
index 00000000..7f6b1829
--- /dev/null
+++
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
@@ -0,0 +1,153 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import json
+import logging
+from typing import Any
+
+from pyflink.datastream import KeySelector
+
+from flink_agents.api.agents.agent import Agent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import (
+ action,
+ chat_model_setup,
+ prompt,
+ tool,
+)
+from flink_agents.api.events.chat_event import ChatRequestEvent,
ChatResponseEvent
+from flink_agents.api.events.event import InputEvent, OutputEvent
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.api.resource import ResourceDescriptor, ResourceName
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import
(
+ ProductReviewAnalysisRes,
+ notify_shipping_manager,
+ review_analysis_prompt,
+)
+
+
+class TableKeySelector(KeySelector):
+ """KeySelector for extracting key from dictionary input (Table data)."""
+
+ def get_key(self, value: Any) -> str:
+ """Extract key from dictionary.
+
+ Parameters
+ ----------
+ value : Any
+ The input value, expected to be a dictionary with 'id' key.
+
+ Returns:
+ -------
+ str
+ The id value as the key.
+ """
+ if isinstance(value, dict):
+ return str(value["id"])
+ # Fallback for other types (e.g., Row)
+ return str(value["id"]) if hasattr(value, "__getitem__") else
str(value.id)
+
+
+class TableReviewAnalysisAgent(Agent):
+ """An agent that analyzes product reviews from Flink Table input.
+
+ This agent is designed to work with Flink Table API. It receives input as
+ dictionary (when using from_table()) and produces analysis results
including
+ satisfaction score and reasons for dissatisfaction.
+
+ The main difference from ReviewAnalysisAgent is that this agent handles
+ dictionary input instead of Pydantic objects.
+ """
+
+ @prompt
+ @staticmethod
+ def review_analysis_prompt() -> Prompt:
+ """Prompt for review analysis."""
+ return review_analysis_prompt
+
+ @tool
+ @staticmethod
+ def notify_shipping_manager(id: str, review: str) -> None:
+ """Notify the shipping manager when product received a negative review
due to
+ shipping damage.
+
+ Parameters
+ ----------
+ id : str
+ The id of the product that received a negative review due to
shipping damage
+ review: str
+ The negative review content
+ """
+ notify_shipping_manager(id=id, review=review)
+
+ @chat_model_setup
+ @staticmethod
+ def review_analysis_model() -> ResourceDescriptor:
+ """ChatModel which focus on review analysis."""
+ return ResourceDescriptor(
+ clazz=ResourceName.ChatModel.OLLAMA_SETUP,
+ connection="ollama_server",
+ model="qwen3:8b",
+ prompt="review_analysis_prompt",
+ tools=["notify_shipping_manager"],
+ extract_reasoning=True,
+ )
+
+ @action(InputEvent)
+ @staticmethod
+ def process_input(event: InputEvent, ctx: RunnerContext) -> None:
+ """Process input event from Table data (dictionary format).
+
+ When using from_table(), the input is a dictionary with keys matching
+ the table column names.
+ """
+ # Table input is dictionary format: {"id": "xxx", "review": "xxx",
"ts": 123}
+ input_dict = event.input
+ product_id = str(input_dict["id"])
+ review_text = str(input_dict["review"])
+
+ ctx.short_term_memory.set("id", product_id)
+
+ content = f"""
+ "id": {product_id},
+ "review": {review_text}
+ """
+ msg = ChatMessage(role=MessageRole.USER, extra_args={"input": content})
+ ctx.send_event(ChatRequestEvent(model="review_analysis_model",
messages=[msg]))
+
+ @action(ChatResponseEvent)
+ @staticmethod
+ def process_chat_response(event: ChatResponseEvent, ctx: RunnerContext) ->
None:
+ """Process chat response event and send output event."""
+ try:
+ json_content = json.loads(event.response.content)
+ ctx.send_event(
+ OutputEvent(
+ output=ProductReviewAnalysisRes(
+ id=ctx.short_term_memory.get("id"),
+ score=json_content["score"],
+ reasons=json_content["reasons"],
+ )
+ )
+ )
+ except Exception:
+ logging.exception(
+ f"Error processing chat response {event.response.content}"
+ )
+ # To fail the agent, you can raise an exception here.
diff --git
a/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
index c133008f..81350285 100644
--- a/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
+++ b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py
@@ -15,26 +15,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-import json
-import logging
from pathlib import Path
from typing import Iterable
-from pyflink.common import Configuration, Duration, Time, WatermarkStrategy
-from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.common import Configuration, Time
from pyflink.datastream import (
ProcessWindowFunction,
StreamExecutionEnvironment,
)
-from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
-from pyflink.datastream.window import (
- TumblingProcessingTimeWindows,
-)
+from pyflink.datastream.window import TumblingProcessingTimeWindows
+from pyflink.table import DataTypes, Schema, StreamTableEnvironment,
TableDescriptor
+from pyflink.table.expressions import col
from flink_agents.api.execution_environment import AgentsExecutionEnvironment
from flink_agents.api.resource import ResourceType
from flink_agents.examples.quickstart.agents.custom_types_and_resources import
(
- ProductReview,
ProductReviewSummary,
ollama_server_descriptor,
)
@@ -43,25 +38,15 @@ from
flink_agents.examples.quickstart.agents.product_suggestion_agent import (
)
from flink_agents.examples.quickstart.agents.review_analysis_agent import (
ProductReviewAnalysisRes,
- ReviewAnalysisAgent,
+)
+from flink_agents.examples.quickstart.agents.table_review_analysis_agent
import (
+ TableKeySelector,
+ TableReviewAnalysisAgent,
)
current_dir = Path(__file__).parent
-class MyTimestampAssigner(TimestampAssigner):
- """Assign timestamp to each element."""
-
- def extract_timestamp(self, element: str, record_timestamp: int) -> int:
- """Extract timestamp from JSON string."""
- try:
- json_element = json.loads(element)
- return json_element["ts"] * 1000
- except json.JSONDecodeError:
- logging.exception(f"Error decoding JSON: {element}")
- return record_timestamp
-
-
class AggregateScoreDistributionAndDislikeReasons(ProcessWindowFunction):
"""Aggregate score distribution and dislike reasons."""
@@ -95,7 +80,7 @@ def main() -> None:
"""Main function for the product improvement suggestion quickstart example.
This example demonstrates a multi-stage streaming pipeline using Flink
Agents:
- 1. Reads product reviews from a text file as a streaming source.
+ 1. Reads product reviews from a JSON file using Flink Table API.
2. Uses an LLM agent to analyze each review and extract score and
unsatisfied
reasons.
3. Aggregates the analysis results in 1-minute tumbling windows,
producing score
@@ -110,7 +95,15 @@ def main() -> None:
config.set_string("python.fn-execution.bundle.size", "1")
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(1)
- agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+
+ # Create StreamTableEnvironment for Table API support.
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+ # Create AgentsExecutionEnvironment with both env and t_env to enable
+ # Table API integration.
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(
+ env=env, t_env=t_env
+ )
# Add Ollama chat model connection to be used by the ReviewAnalysisAgent
# and ProductSuggestionAgent.
@@ -120,39 +113,35 @@ def main() -> None:
ollama_server_descriptor,
)
- # Read product reviews from a text file as a streaming source.
+ # Read product reviews from a JSON file using Flink Table API.
# Each line in the file should be a JSON string representing a
ProductReview.
- product_review_stream = (
- env.from_source(
- source=FileSource.for_record_stream_format(
- StreamFormat.text_line_format(),
- f"file:///{current_dir}/resources/product_review.txt",
- )
- .monitor_continuously(Duration.of_minutes(1))
- .build(),
- watermark_strategy=WatermarkStrategy.no_watermarks(),
- source_name="streaming_agent_example",
- )
- .set_parallelism(1)
- .assign_timestamps_and_watermarks(
-
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
- MyTimestampAssigner()
- )
- )
- .map(
- lambda x: ProductReview.model_validate_json(
- x
- ) # Deserialize JSON to ProductReview.
+ # Define the source table with watermark based on the 'ts' column.
+ t_env.create_temporary_table(
+ "product_reviews",
+ TableDescriptor.for_connector("filesystem")
+ .schema(
+ Schema.new_builder()
+ .column("id", DataTypes.STRING())
+ .column("review", DataTypes.STRING())
+ .column("ts", DataTypes.BIGINT())
+ .column_by_expression("rowtime", "TO_TIMESTAMP_LTZ(`ts` * 1000,
3)")
+ .watermark("rowtime", "rowtime - INTERVAL '0' SECOND")
+ .build()
)
+ .option("format", "json")
+ .option("path", f"file:///{current_dir}/resources/product_review.txt")
+ .build(),
)
- # Use the ReviewAnalysisAgent (LLM) to analyze each review.
+ input_table = t_env.from_path("product_reviews").select(
+ col("id"), col("review"), col("ts")
+ )
+
+ # Use the TableReviewAnalysisAgent (LLM) to analyze each review.
# The agent extracts the review score and unsatisfied reasons.
review_analysis_res_stream = (
- agents_env.from_datastream(
- input=product_review_stream, key_selector=lambda x: x.id
- )
- .apply(ReviewAnalysisAgent())
+ agents_env.from_table(input=input_table,
key_selector=TableKeySelector())
+ .apply(TableReviewAnalysisAgent())
.to_datastream()
)