This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new c0f532d2 [examples] Quickstart process_chat_response fails fast on
malformed JSON (#811)
c0f532d2 is described below
commit c0f532d22f63540d714320a4f775e826f376e0b8
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jun 8 18:47:04 2026 +0800
[examples] Quickstart process_chat_response fails fast on malformed JSON
(#811)
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../examples/agents/ProductSuggestionAgent.java | 5 ++++
.../examples/agents/ReviewAnalysisAgent.java | 5 ++++
.../examples/agents/TableReviewAnalysisAgent.java | 5 ++++
.../quickstart/agents/product_suggestion_agent.py | 30 ++++++++++------------
.../quickstart/agents/review_analysis_agent.py | 30 ++++++++++------------
.../agents/table_review_analysis_agent.py | 29 ++++++++++-----------
6 files changed, 57 insertions(+), 47 deletions(-)
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
index 303ef420..2d1e1222 100644
---
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
+++
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
@@ -98,6 +98,11 @@ public class ProductSuggestionAgent extends Agent {
@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
public static void processChatResponse(Event event, RunnerContext ctx)
throws Exception {
ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+ // Fail fast on a malformed LLM response: a parse error here
propagates and fails the
+ // agent, so a dropped input is surfaced instead of silently lost. In
production, choose
+ // the handling that fits your pipeline: raise to fail the input (as
below), emit an
+ // OutputEvent carrying an error sentinel, or send a custom error
event so downstream
+ // operators can detect the failure.
JsonNode jsonNode =
MAPPER.readTree(chatResponse.getResponse().getContent());
JsonNode suggestionsNode = jsonNode.findValue("suggestion_list");
List<String> suggestions = new ArrayList<>();
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
index 05dff401..7fbfe752 100644
---
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
+++
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
@@ -111,6 +111,11 @@ public class ReviewAnalysisAgent extends Agent {
@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
public static void processChatResponse(Event event, RunnerContext ctx)
throws Exception {
ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+ // Fail fast on a malformed LLM response: a parse error here
propagates and fails the
+ // agent, so a dropped input is surfaced instead of silently lost. In
production, choose
+ // the handling that fits your pipeline: raise to fail the input (as
below), emit an
+ // OutputEvent carrying an error sentinel, or send a custom error
event so downstream
+ // operators can detect the failure.
JsonNode jsonNode =
MAPPER.readTree(chatResponse.getResponse().getContent());
JsonNode scoreNode = jsonNode.findValue("score");
JsonNode reasonsNode = jsonNode.findValue("reasons");
diff --git
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
index 77b314f4..18d754be 100644
---
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
+++
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
@@ -131,6 +131,11 @@ public class TableReviewAnalysisAgent extends Agent {
@Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
public static void processChatResponse(Event event, RunnerContext ctx)
throws Exception {
ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+ // Fail fast on a malformed LLM response: a parse error here
propagates and fails the
+ // agent, so a dropped input is surfaced instead of silently lost. In
production, choose
+ // the handling that fits your pipeline: raise to fail the input (as
below), emit an
+ // OutputEvent carrying an error sentinel, or send a custom error
event so downstream
+ // operators can detect the failure.
JsonNode jsonNode =
MAPPER.readTree(chatResponse.getResponse().getContent());
JsonNode scoreNode = jsonNode.findValue("score");
JsonNode reasonsNode = jsonNode.findValue("reasons");
diff --git
a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
index f4e1c592..20350835 100644
--- a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
+++ b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
@@ -16,7 +16,6 @@
# limitations under the License.
#################################################################################
import json
-import logging
from flink_agents.api.agents.agent import Agent
from flink_agents.api.chat_message import ChatMessage, MessageRole
@@ -93,20 +92,19 @@ class ProductSuggestionAgent(Agent):
def process_chat_response(event: Event, ctx: RunnerContext) -> None:
"""Process chat response event."""
chat_response = ChatResponseEvent.from_event(event)
- try:
- json_content = json.loads(chat_response.response.content)
- ctx.send_event(
- OutputEvent(
- output=ProductSuggestion(
- id=ctx.short_term_memory.get("id"),
- score_hist=ctx.short_term_memory.get("score_hist"),
- suggestions=json_content["suggestion_list"],
- )
+ # Fail fast on a malformed LLM response: a parse error here propagates
+ # and fails the agent, so a dropped input is surfaced instead of
+ # silently lost. In production, choose the handling that fits your
+ # pipeline: raise to fail the input (as below), emit an OutputEvent
+ # carrying an error sentinel, or send a custom error event so
+ # downstream operators can detect the failure.
+ json_content = json.loads(chat_response.response.content)
+ ctx.send_event(
+ OutputEvent(
+ output=ProductSuggestion(
+ id=ctx.short_term_memory.get("id"),
+ score_hist=ctx.short_term_memory.get("score_hist"),
+ suggestions=json_content["suggestion_list"],
)
)
- except Exception:
- logging.exception(
- f"Error processing chat response
{chat_response.response.content}"
- )
-
- # To fail the agent, you can raise an exception here.
+ )
diff --git
a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
index 3af49129..8bd3b74d 100644
--- a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
+++ b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
@@ -16,7 +16,6 @@
# limitations under the License.
#################################################################################
import json
-import logging
from flink_agents.api.agents.agent import Agent
from flink_agents.api.chat_message import ChatMessage, MessageRole
@@ -109,20 +108,19 @@ class ReviewAnalysisAgent(Agent):
def process_chat_response(event: Event, ctx: RunnerContext) -> None:
"""Process chat response event and send output event."""
chat_response = ChatResponseEvent.from_event(event)
- try:
- json_content = json.loads(chat_response.response.content)
- ctx.send_event(
- OutputEvent(
- output=ProductReviewAnalysisRes(
- id=ctx.short_term_memory.get("id"),
- score=json_content["score"],
- reasons=json_content["reasons"],
- )
+ # Fail fast on a malformed LLM response: a parse error here propagates
+ # and fails the agent, so a dropped input is surfaced instead of
+ # silently lost. In production, choose the handling that fits your
+ # pipeline: raise to fail the input (as below), emit an OutputEvent
+ # carrying an error sentinel, or send a custom error event so
+ # downstream operators can detect the failure.
+ json_content = json.loads(chat_response.response.content)
+ ctx.send_event(
+ OutputEvent(
+ output=ProductReviewAnalysisRes(
+ id=ctx.short_term_memory.get("id"),
+ score=json_content["score"],
+ reasons=json_content["reasons"],
)
)
- except Exception:
- logging.exception(
- f"Error processing chat response
{chat_response.response.content}"
- )
-
- # To fail the agent, you can raise an exception here.
+ )
diff --git
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
index 64db9351..1ee1f801 100644
---
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
+++
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
@@ -17,7 +17,6 @@
#################################################################################
import json
-import logging
from typing import Any
from pyflink.datastream import KeySelector
@@ -141,19 +140,19 @@ class TableReviewAnalysisAgent(Agent):
def process_chat_response(event: Event, ctx: RunnerContext) -> None:
"""Process chat response event and send output event."""
chat_response = ChatResponseEvent.from_event(event)
- try:
- json_content = json.loads(chat_response.response.content)
- ctx.send_event(
- OutputEvent(
- output=ProductReviewAnalysisRes(
- id=ctx.short_term_memory.get("id"),
- score=json_content["score"],
- reasons=json_content["reasons"],
- )
+ # Fail fast on a malformed LLM response: a parse error here propagates
+ # and fails the agent, so a dropped input is surfaced instead of
+ # silently lost. In production, choose the handling that fits your
+ # pipeline: raise to fail the input (as below), emit an OutputEvent
+ # carrying an error sentinel, or send a custom error event so
+ # downstream operators can detect the failure.
+ json_content = json.loads(chat_response.response.content)
+ ctx.send_event(
+ OutputEvent(
+ output=ProductReviewAnalysisRes(
+ id=ctx.short_term_memory.get("id"),
+ score=json_content["score"],
+ reasons=json_content["reasons"],
)
)
- except Exception:
- logging.exception(
- f"Error processing chat response
{chat_response.response.content}"
- )
- # To fail the agent, you can raise an exception here.
+ )