This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new c0f532d2 [examples] Quickstart process_chat_response fails fast on 
malformed JSON (#811)
c0f532d2 is described below

commit c0f532d22f63540d714320a4f775e826f376e0b8
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jun 8 18:47:04 2026 +0800

    [examples] Quickstart process_chat_response fails fast on malformed JSON 
(#811)
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../examples/agents/ProductSuggestionAgent.java    |  5 ++++
 .../examples/agents/ReviewAnalysisAgent.java       |  5 ++++
 .../examples/agents/TableReviewAnalysisAgent.java  |  5 ++++
 .../quickstart/agents/product_suggestion_agent.py  | 30 ++++++++++------------
 .../quickstart/agents/review_analysis_agent.py     | 30 ++++++++++------------
 .../agents/table_review_analysis_agent.py          | 29 ++++++++++-----------
 6 files changed, 57 insertions(+), 47 deletions(-)

diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
index 303ef420..2d1e1222 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ProductSuggestionAgent.java
@@ -98,6 +98,11 @@ public class ProductSuggestionAgent extends Agent {
     @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
     public static void processChatResponse(Event event, RunnerContext ctx) 
throws Exception {
         ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+        // Fail fast on a malformed LLM response: a parse error here 
propagates and fails the
+        // agent, so a dropped input is surfaced instead of silently lost. In 
production, choose
+        // the handling that fits your pipeline: raise to fail the input (as 
below), emit an
+        // OutputEvent carrying an error sentinel, or send a custom error 
event so downstream
+        // operators can detect the failure.
         JsonNode jsonNode = 
MAPPER.readTree(chatResponse.getResponse().getContent());
         JsonNode suggestionsNode = jsonNode.findValue("suggestion_list");
         List<String> suggestions = new ArrayList<>();
diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
index 05dff401..7fbfe752 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java
@@ -111,6 +111,11 @@ public class ReviewAnalysisAgent extends Agent {
     @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
     public static void processChatResponse(Event event, RunnerContext ctx) 
throws Exception {
         ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+        // Fail fast on a malformed LLM response: a parse error here 
propagates and fails the
+        // agent, so a dropped input is surfaced instead of silently lost. In 
production, choose
+        // the handling that fits your pipeline: raise to fail the input (as 
below), emit an
+        // OutputEvent carrying an error sentinel, or send a custom error 
event so downstream
+        // operators can detect the failure.
         JsonNode jsonNode = 
MAPPER.readTree(chatResponse.getResponse().getContent());
         JsonNode scoreNode = jsonNode.findValue("score");
         JsonNode reasonsNode = jsonNode.findValue("reasons");
diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
index 77b314f4..18d754be 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/agents/TableReviewAnalysisAgent.java
@@ -131,6 +131,11 @@ public class TableReviewAnalysisAgent extends Agent {
     @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
     public static void processChatResponse(Event event, RunnerContext ctx) 
throws Exception {
         ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event);
+        // Fail fast on a malformed LLM response: a parse error here 
propagates and fails the
+        // agent, so a dropped input is surfaced instead of silently lost. In 
production, choose
+        // the handling that fits your pipeline: raise to fail the input (as 
below), emit an
+        // OutputEvent carrying an error sentinel, or send a custom error 
event so downstream
+        // operators can detect the failure.
         JsonNode jsonNode = 
MAPPER.readTree(chatResponse.getResponse().getContent());
         JsonNode scoreNode = jsonNode.findValue("score");
         JsonNode reasonsNode = jsonNode.findValue("reasons");
diff --git 
a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py 
b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
index f4e1c592..20350835 100644
--- a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
+++ b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
#################################################################################
 import json
-import logging
 
 from flink_agents.api.agents.agent import Agent
 from flink_agents.api.chat_message import ChatMessage, MessageRole
@@ -93,20 +92,19 @@ class ProductSuggestionAgent(Agent):
     def process_chat_response(event: Event, ctx: RunnerContext) -> None:
         """Process chat response event."""
         chat_response = ChatResponseEvent.from_event(event)
-        try:
-            json_content = json.loads(chat_response.response.content)
-            ctx.send_event(
-                OutputEvent(
-                    output=ProductSuggestion(
-                        id=ctx.short_term_memory.get("id"),
-                        score_hist=ctx.short_term_memory.get("score_hist"),
-                        suggestions=json_content["suggestion_list"],
-                    )
+        # Fail fast on a malformed LLM response: a parse error here propagates
+        # and fails the agent, so a dropped input is surfaced instead of
+        # silently lost. In production, choose the handling that fits your
+        # pipeline: raise to fail the input (as below), emit an OutputEvent
+        # carrying an error sentinel, or send a custom error event so
+        # downstream operators can detect the failure.
+        json_content = json.loads(chat_response.response.content)
+        ctx.send_event(
+            OutputEvent(
+                output=ProductSuggestion(
+                    id=ctx.short_term_memory.get("id"),
+                    score_hist=ctx.short_term_memory.get("score_hist"),
+                    suggestions=json_content["suggestion_list"],
                 )
             )
-        except Exception:
-            logging.exception(
-                f"Error processing chat response 
{chat_response.response.content}"
-            )
-
-            # To fail the agent, you can raise an exception here.
+        )
diff --git 
a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py 
b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
index 3af49129..8bd3b74d 100644
--- a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
+++ b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
#################################################################################
 import json
-import logging
 
 from flink_agents.api.agents.agent import Agent
 from flink_agents.api.chat_message import ChatMessage, MessageRole
@@ -109,20 +108,19 @@ class ReviewAnalysisAgent(Agent):
     def process_chat_response(event: Event, ctx: RunnerContext) -> None:
         """Process chat response event and send output event."""
         chat_response = ChatResponseEvent.from_event(event)
-        try:
-            json_content = json.loads(chat_response.response.content)
-            ctx.send_event(
-                OutputEvent(
-                    output=ProductReviewAnalysisRes(
-                        id=ctx.short_term_memory.get("id"),
-                        score=json_content["score"],
-                        reasons=json_content["reasons"],
-                    )
+        # Fail fast on a malformed LLM response: a parse error here propagates
+        # and fails the agent, so a dropped input is surfaced instead of
+        # silently lost. In production, choose the handling that fits your
+        # pipeline: raise to fail the input (as below), emit an OutputEvent
+        # carrying an error sentinel, or send a custom error event so
+        # downstream operators can detect the failure.
+        json_content = json.loads(chat_response.response.content)
+        ctx.send_event(
+            OutputEvent(
+                output=ProductReviewAnalysisRes(
+                    id=ctx.short_term_memory.get("id"),
+                    score=json_content["score"],
+                    reasons=json_content["reasons"],
                 )
             )
-        except Exception:
-            logging.exception(
-                f"Error processing chat response 
{chat_response.response.content}"
-            )
-
-            # To fail the agent, you can raise an exception here.
+        )
diff --git 
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py 
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
index 64db9351..1ee1f801 100644
--- 
a/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
+++ 
b/python/flink_agents/examples/quickstart/agents/table_review_analysis_agent.py
@@ -17,7 +17,6 @@
 
#################################################################################
 
 import json
-import logging
 from typing import Any
 
 from pyflink.datastream import KeySelector
@@ -141,19 +140,19 @@ class TableReviewAnalysisAgent(Agent):
     def process_chat_response(event: Event, ctx: RunnerContext) -> None:
         """Process chat response event and send output event."""
         chat_response = ChatResponseEvent.from_event(event)
-        try:
-            json_content = json.loads(chat_response.response.content)
-            ctx.send_event(
-                OutputEvent(
-                    output=ProductReviewAnalysisRes(
-                        id=ctx.short_term_memory.get("id"),
-                        score=json_content["score"],
-                        reasons=json_content["reasons"],
-                    )
+        # Fail fast on a malformed LLM response: a parse error here propagates
+        # and fails the agent, so a dropped input is surfaced instead of
+        # silently lost. In production, choose the handling that fits your
+        # pipeline: raise to fail the input (as below), emit an OutputEvent
+        # carrying an error sentinel, or send a custom error event so
+        # downstream operators can detect the failure.
+        json_content = json.loads(chat_response.response.content)
+        ctx.send_event(
+            OutputEvent(
+                output=ProductReviewAnalysisRes(
+                    id=ctx.short_term_memory.get("id"),
+                    score=json_content["score"],
+                    reasons=json_content["reasons"],
                 )
             )
-        except Exception:
-            logging.exception(
-                f"Error processing chat response 
{chat_response.response.content}"
-            )
-            # To fail the agent, you can raise an exception here.
+        )

Reply via email to