weiqingy commented on code in PR #830:
URL: https://github.com/apache/flink-agents/pull/830#discussion_r3394054297


##########
python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py:
##########
@@ -0,0 +1,174 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import os
+from typing import Any, Dict, Tuple
+
+from pydantic import BaseModel
+
+from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent
+from flink_agents.api.agents.types import OutputSchema
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import action, chat_model_setup
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.resource import ResourceDescriptor, ResourceName
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import 
(
+    AspectResponse,
+    SummaryResponse,
+)
+
+OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b")
+
+INPUT_TEXT = "The food here is great, but the service is too slow"
+ASPECTS: Tuple[str, ...] = ("taste", "service", "price")
+N_ASPECTS = len(ASPECTS)
+
+PARALLEL_SYSTEM_PROMPT = (
+    "You are a sentiment analysis assistant. Return JSON: "
+    '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}'
+    " — no explanation, no extra fields."
+)
+AGGREGATE_SYSTEM_PROMPT = (
+    "You are a summary assistant. Based on the sentiment judgments for three "
+    "dimensions, compose a brief one-line evaluation. Return JSON: "
+    '{"summary":"taste: service: price:"} — return only this JSON.'
+)
+
+
+def _init_row(event: Event) -> Dict[str, Any]:
+    """Build a row skeleton from the InputEvent."""
+    payload = InputEvent.from_event(event).input
+    return {"id": payload["id"], "text": payload["text"], "sentiments": {}}
+
+
+def _save_row(ctx: RunnerContext, row: Dict[str, Any]) -> None:
+    """Write the row to sensory memory."""
+    ctx.sensory_memory.set("res", json.dumps(row, ensure_ascii=False))
+
+
+def _load_row(ctx: RunnerContext) -> Dict[str, Any]:
+    """Read the row from sensory memory."""
+    return json.loads(ctx.sensory_memory.get("res"))
+
+
+def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent:
+    """Build a ChatRequestEvent for a single aspect dimension."""
+    return ChatRequestEvent(
+        model="sentiment_model",
+        messages=[
+            ChatMessage(role=MessageRole.SYSTEM, 
content=PARALLEL_SYSTEM_PROMPT),
+            ChatMessage(
+                role=MessageRole.USER,
+                content=f'Judge the "{aspect}" dimension: {text}',
+            ),
+        ],
+        output_schema=OutputSchema(output_schema=AspectResponse),
+    )
+
+
+def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent:
+    """Build a ChatRequestEvent for the aggregation phase."""
+    sentiments = row["sentiments"]
+    body = (
+        f"Original: {row['text']}\n"
+        + "Judgments: "
+        + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS)
+    )
+    return ChatRequestEvent(
+        model="sentiment_model",
+        messages=[
+            ChatMessage(role=MessageRole.SYSTEM, 
content=AGGREGATE_SYSTEM_PROMPT),
+            ChatMessage(role=MessageRole.USER, content=body),
+        ],
+        output_schema=OutputSchema(output_schema=SummaryResponse),
+    )
+
+
+def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> 
OutputEvent:
+    """Pack row fields and summary into the final OutputEvent."""
+    return OutputEvent(
+        output={"id": row["id"], "text": row["text"], "summary": 
parsed.summary}
+    )
+
+
+def _parse_response(event: Event) -> AspectResponse | SummaryResponse:
+    """Parse a ChatResponseEvent into a structured response object."""
+    response = ChatResponseEvent.from_event(event).response
+    raw = response.extra_args[STRUCTURED_OUTPUT]
+    if isinstance(raw, BaseModel):
+        return raw
+    if "summary" in raw:
+        return SummaryResponse.model_validate(raw)
+    return AspectResponse.model_validate(raw)
+
+
+def _is_final(parsed: AspectResponse | SummaryResponse) -> bool:
+    """Return True if the parsed response is from the aggregation phase."""
+    return isinstance(parsed, SummaryResponse)
+
+
+def _all_aspects_received(row: Dict[str, Any]) -> bool:
+    """Return True if all aspect judgments have been collected."""
+    return len(row["sentiments"]) == N_ASPECTS
+
+
+class ParallelChatAgent(Agent):
+    """An agent that demonstrates parallel LLM invocations via fan-out of
+    multiple ChatRequestEvent events.
+
+    This agent receives a restaurant review and uses an LLM to judge sentiment
+    along multiple dimensions in parallel, then aggregates the results into a
+    one-line summary with a final LLM call. It handles prompt construction,
+    parallel chat dispatch, response accumulation, and output assembly.
+    """
+
+    @chat_model_setup
+    @staticmethod
+    def sentiment_model() -> ResourceDescriptor:
+        """ChatModel for sentiment analysis."""
+        return ResourceDescriptor(
+            clazz=ResourceName.ChatModel.OLLAMA_SETUP,
+            connection="ollama_server",
+            model=OLLAMA_MODEL,
+            extract_reasoning=True,
+        )
+
+    @action(InputEvent.EVENT_TYPE)
+    @staticmethod
+    def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None:
+        """Process input event and send chat requests for each aspect."""
+        row = _init_row(event)
+        _save_row(ctx, row)
+        for aspect in ASPECTS:
+            ctx.send_event(_build_aspect_request(row["text"], aspect))
+
+    @action(ChatResponseEvent.EVENT_TYPE)
+    @staticmethod
+    def handle_response(event: Event, ctx: RunnerContext) -> None:
+        """Process chat response event and send output event."""
+        parsed = _parse_response(event)
+        row = _load_row(ctx)
+        if _is_final(parsed):
+            ctx.send_event(_build_output_event(row, parsed))
+            return
+        row["sentiments"][parsed.aspect] = parsed.result

Review Comment:
   The accumulator is keyed on `parsed.aspect` — the string the LLM put in its 
response — while the completion check is `len(row["sentiments"]) == N_ASPECTS`. 
Since the key is whatever the model echoes back rather than the aspect each 
request was dispatched for, two failure modes open up with a 
small/non-deterministic model like the documented `qwen3:1.7b`: if the model 
returns the same `aspect` value twice, the map never reaches size 3 and the 
summarize request is never sent (the input silently produces no output); if it 
returns a label outside `ASPECTS`, the later `_build_summarize_request` lookup 
over the fixed `ASPECTS` list misses. The two languages even diverge on that 
miss — Python raises `KeyError` (line 92), while Java yields a silent 
`"taste:null"` (`ParallelChatAgent.java:123`).
   
   Was correlating by the dispatched aspect considered here — e.g. carrying it 
through a `prompt_args` round-trip or a `request_id` — rather than trusting the 
model to echo it back? If the "trust the echoed aspect" simplification is 
intentional for the demo, would it be worth a doc note that the design assumes 
the model returns exactly one of the three labels, and that a non-conforming 
response breaks collection? That assumption is currently the load-bearing part 
of the example and it's invisible to a reader.



##########
python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py:
##########
@@ -0,0 +1,174 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import os
+from typing import Any, Dict, Tuple
+
+from pydantic import BaseModel
+
+from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent
+from flink_agents.api.agents.types import OutputSchema
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import action, chat_model_setup
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.resource import ResourceDescriptor, ResourceName
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import 
(
+    AspectResponse,
+    SummaryResponse,
+)
+
+OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b")
+
+INPUT_TEXT = "The food here is great, but the service is too slow"
+ASPECTS: Tuple[str, ...] = ("taste", "service", "price")
+N_ASPECTS = len(ASPECTS)
+
+PARALLEL_SYSTEM_PROMPT = (
+    "You are a sentiment analysis assistant. Return JSON: "
+    '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}'
+    " — no explanation, no extra fields."
+)
+AGGREGATE_SYSTEM_PROMPT = (

Review Comment:
   The two `AGGREGATE_SYSTEM_PROMPT`s ask the model for materially different 
summary formats. The Java side (`ParallelChatAgent.java:71-76`) spells out the 
value vocabulary inline — `{"summary":"taste:<positive/negative/not_mentioned>, 
service:<...>, price:<...>"}` — while this one gives an empty template, 
`{"summary":"taste: service: price:"}`. From the same input the two examples 
will produce visibly different summaries. The `PARALLEL_SYSTEM_PROMPT` is 
identical across both languages, which is what makes this read as an oversight 
rather than a deliberate per-language choice — and CLAUDE.md asks for 
Java/Python parity on cross-language features. Would it make sense to bring 
these two in line?



##########
python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py:
##########
@@ -196,6 +196,21 @@ class ProductReviewAnalysisRes(BaseModel):
     reasons: list[str]
 
 
+# Custom types for parallel chat agent.
+class AspectResponse(BaseModel):
+    """LLM response for a single aspect judgment."""
+
+    aspect: str
+    result: str
+
+
+class SummaryResponse(BaseModel):
+    """LLM response for the aggregation phase."""
+
+    summary: str
+

Review Comment:
   Three blank lines between `SummaryResponse` and the next statement — ruff 
flags this as `E303 too many blank lines (3)`, so `./tools/lint.sh -c` will 
fail in CI. Reduce to two.



##########
examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.examples.agents;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceName;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import static org.apache.flink.agents.api.agents.Agent.STRUCTURED_OUTPUT;
+
+/**
+ * An agent that demonstrates parallel LLM invocations via fan-out of multiple 
{@link
+ * ChatRequestEvent} events.
+ *
+ * <p>This agent receives a restaurant review and uses an LLM to judge 
sentiment along multiple
+ * dimensions (taste / service / price) in parallel, then aggregates the 
results into a one-line
+ * summary with a final LLM call. It handles prompt construction, parallel 
chat dispatch, response
+ * accumulation, and output assembly.
+ *
+ * <p><b>JDK version note:</b> On JDK 21+, the framework uses the Continuation 
API to execute
+ * concurrent chat actions in parallel, so the wall clock time is roughly 
"slowest single branch +
+ * aggregation call". On JDK &lt; 21, the framework silently falls back to 
sequential execution —
+ * the result is identical, but the LLM calls run one after another.
+ */
+public class ParallelChatAgent extends Agent {
+
+    /** Ollama model name, configurable via environment variable. */
+    public static final String OLLAMA_MODEL =
+            System.getenv().getOrDefault("PARALLEL_CHAT_OLLAMA_MODEL", 
"qwen3:1.7b");
+
+    /** Input text for the demo. */
+    public static final String INPUT_TEXT = "The food here is great, but the 
service is too slow";
+
+    private static final String[] ASPECTS = {"taste", "service", "price"};
+    private static final int N_ASPECTS = ASPECTS.length;
+
+    private static final String PARALLEL_SYSTEM_PROMPT =
+            "You are a sentiment analysis assistant. Return JSON: "
+                    + "{\"aspect\":\"<dimension>\", 
\"result\":\"<positive|negative|not_mentioned>\"}"
+                    + " — no explanation, no extra fields.";
+    private static final String AGGREGATE_SYSTEM_PROMPT =
+            "You are a summary assistant. Based on the sentiment judgments for 
three "
+                    + "dimensions, compose a brief one-line evaluation. Return 
JSON: "
+                    + "{\"summary\":\"taste:<positive/negative/not_mentioned>, 
"
+                    + "service:<positive/negative/not_mentioned>, "
+                    + "price:<positive/negative/not_mentioned>\"} — return 
only this JSON.";
+
+    @ChatModelSetup
+    public static ResourceDescriptor sentimentModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP)
+                .addInitialArgument("connection", "ollamaChatModelConnection")
+                .addInitialArgument("model", OLLAMA_MODEL)
+                .addInitialArgument("extract_reasoning", true)
+                .build();
+    }
+
+    private static Map<String, Object> initRow(Event event) {
+        InputEvent inputEvent = InputEvent.fromEvent(event);
+        CustomTypesAndResources.SentimentRequest request =
+                (CustomTypesAndResources.SentimentRequest) 
inputEvent.getInput();
+        Map<String, Object> row = new HashMap<>();
+        row.put("id", request.getId());
+        row.put("text", request.getText());
+        row.put("sentiments", new HashMap<String, String>());
+        return row;
+    }
+
+    private static void saveRow(RunnerContext ctx, Map<String, Object> row) 
throws Exception {
+        ctx.getSensoryMemory().set("res", row);

Review Comment:
   `saveRow`/`loadRow` here store and read the row `Map<String,Object>` object 
directly, while the Python `_save_row`/`_load_row` round-trip through 
`json.dumps`/`json.loads` (`parallel_chat_agent.py:63`). Both work for this 
example, but in a teaching example built around cross-language parity a reader 
comparing the two sides sees Python defensively serializing (Pemja-backed 
memory can't always hold arbitrary Python objects) while Java stores a raw 
nested map. I'm not sure this matters for the demo, so genuinely a question: is 
the divergence intentional, and if so would a one-line doc note on why Python 
serializes help — so a Python reader doesn't infer that `set()` of an arbitrary 
nested dict is always safe?



##########
python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py:
##########
@@ -0,0 +1,174 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import os
+from typing import Any, Dict, Tuple
+
+from pydantic import BaseModel
+
+from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent
+from flink_agents.api.agents.types import OutputSchema
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.decorators import action, chat_model_setup
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.resource import ResourceDescriptor, ResourceName
+from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.quickstart.agents.custom_types_and_resources import 
(
+    AspectResponse,
+    SummaryResponse,
+)
+
+OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b")
+
+INPUT_TEXT = "The food here is great, but the service is too slow"
+ASPECTS: Tuple[str, ...] = ("taste", "service", "price")
+N_ASPECTS = len(ASPECTS)
+
+PARALLEL_SYSTEM_PROMPT = (
+    "You are a sentiment analysis assistant. Return JSON: "
+    '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}'
+    " — no explanation, no extra fields."
+)
+AGGREGATE_SYSTEM_PROMPT = (
+    "You are a summary assistant. Based on the sentiment judgments for three "
+    "dimensions, compose a brief one-line evaluation. Return JSON: "
+    '{"summary":"taste: service: price:"} — return only this JSON.'
+)
+
+
+def _init_row(event: Event) -> Dict[str, Any]:
+    """Build a row skeleton from the InputEvent."""
+    payload = InputEvent.from_event(event).input
+    return {"id": payload["id"], "text": payload["text"], "sentiments": {}}
+
+
+def _save_row(ctx: RunnerContext, row: Dict[str, Any]) -> None:
+    """Write the row to sensory memory."""
+    ctx.sensory_memory.set("res", json.dumps(row, ensure_ascii=False))
+
+
+def _load_row(ctx: RunnerContext) -> Dict[str, Any]:
+    """Read the row from sensory memory."""
+    return json.loads(ctx.sensory_memory.get("res"))
+
+
+def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent:
+    """Build a ChatRequestEvent for a single aspect dimension."""
+    return ChatRequestEvent(
+        model="sentiment_model",
+        messages=[
+            ChatMessage(role=MessageRole.SYSTEM, 
content=PARALLEL_SYSTEM_PROMPT),
+            ChatMessage(
+                role=MessageRole.USER,
+                content=f'Judge the "{aspect}" dimension: {text}',
+            ),
+        ],
+        output_schema=OutputSchema(output_schema=AspectResponse),
+    )
+
+
+def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent:
+    """Build a ChatRequestEvent for the aggregation phase."""
+    sentiments = row["sentiments"]
+    body = (
+        f"Original: {row['text']}\n"
+        + "Judgments: "
+        + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS)
+    )
+    return ChatRequestEvent(
+        model="sentiment_model",
+        messages=[
+            ChatMessage(role=MessageRole.SYSTEM, 
content=AGGREGATE_SYSTEM_PROMPT),
+            ChatMessage(role=MessageRole.USER, content=body),
+        ],
+        output_schema=OutputSchema(output_schema=SummaryResponse),
+    )
+
+
+def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> 
OutputEvent:
+    """Pack row fields and summary into the final OutputEvent."""
+    return OutputEvent(
+        output={"id": row["id"], "text": row["text"], "summary": 
parsed.summary}
+    )
+
+
+def _parse_response(event: Event) -> AspectResponse | SummaryResponse:

Review Comment:
   With an `OutputSchema(output_schema=...)` of a `BaseModel`, 
`_generate_structured_output` always stores a validated pydantic instance, so 
`isinstance(raw, BaseModel)` is always true and the two `model_validate` 
fallbacks below it are dead for this example. Not incorrect, but in a teaching 
example a dead branch invites the reader to wonder when it fires. Would 
dropping the dict fallback (or a one-line note on when `raw` would be a dict) 
keep the example's intent clearer?



##########
docs/content/docs/get-started/quickstart/parallel_llm.md:
##########
@@ -0,0 +1,349 @@
+---
+title: 'Parallel LLM Calls'
+weight: 3
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Overview
+
+Flink Agents supports parallel LLM invocations via multi-action fan-out. By 
emitting multiple `ChatRequestEvent` events from a single action, the 
framework's built-in chat action executes the corresponding LLM calls 
concurrently — no external orchestration is required.
+
+This quickstart introduces an example that demonstrates how to build a 
parallel LLM workflow with Flink Agents:
+
+The **Parallel Sentiment Analysis** agent processes a restaurant review and 
judges sentiment along three dimensions (taste / service / price) in parallel, 
then aggregates the results into a one-line summary with a final LLM call. The 
end-to-end wall clock time is roughly "slowest single branch + aggregation 
call", rather than the sum of all four calls.
+
+{{< hint info >}}
+**JDK version note (Java only):** On JDK 21+, the framework uses the 
Continuation API to execute concurrent chat actions in parallel. On JDK < 21, 
the framework silently falls back to sequential execution — the result is 
identical, but the LLM calls run one after another. Python uses native 
coroutines and always executes in parallel regardless of the JDK version.
+{{< /hint >}}
+
+## Code Walkthrough
+
+### Prepare Agents Execution Environment
+
+Create the agents execution environment, and register the available chat model 
connection to the environment.
+
+{{< tabs "Prepare Agents Execution Environment" >}}
+
+{{< tab "Python" >}}
+```python
+# Set up the Flink streaming environment and the Agents execution environment.
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_parallelism(1)
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+
+# Add Ollama chat model connection to be used by the ParallelChatAgent.
+agents_env.add_resource(
+    "ollama_server",
+    ResourceType.CHAT_MODEL_CONNECTION,
+    ResourceDescriptor(
+        clazz=ResourceName.ChatModel.OLLAMA_CONNECTION,
+        request_timeout=240.0,
+    ),
+)
+```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```Java
+// Set up the Flink streaming environment and the Agents execution environment.
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+AgentsExecutionEnvironment agentsEnv =
+        AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+// Add Ollama chat model connection to be used by the ParallelChatAgent.
+agentsEnv.addResource(
+        "ollamaChatModelConnection",
+        ResourceType.CHAT_MODEL_CONNECTION,
+        
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION)
+                .addInitialArgument("endpoint", "http://localhost:11434";)
+                .addInitialArgument("requestTimeout", 240)
+                .build());
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### Create the Agent
+
+Below is the example code for the `ParallelChatAgent`. The agent defines a 
chat model for sentiment analysis and two actions: `request_aspect_judgments` 
fans out one `ChatRequestEvent` per dimension, and `handle_response` collects 
the results, triggers the aggregation call, and emits the final output. For 
more details, please refer to the [Workflow Agent]({{< ref 
"docs/development/workflow_agent" >}}) documentation.
+
+{{< tabs "Create the Agent" >}}
+
+{{< tab "Python" >}}
+```python
+class ParallelChatAgent(Agent):
+    """An agent that demonstrates parallel LLM invocations via fan-out of
+    multiple ChatRequestEvent events.
+
+    This agent receives a restaurant review and uses an LLM to judge sentiment
+    along multiple dimensions in parallel, then aggregates the results into a
+    one-line summary with a final LLM call. It handles prompt construction,
+    parallel chat dispatch, response accumulation, and output assembly.
+    """
+
+    @chat_model_setup
+    @staticmethod
+    def sentiment_model() -> ResourceDescriptor:
+        """ChatModel for sentiment analysis."""
+        return ResourceDescriptor(
+            clazz=ResourceName.ChatModel.OLLAMA_SETUP,
+            connection="ollama_server",
+            model=OLLAMA_MODEL,
+            extract_reasoning=True,
+        )
+
+    @action(InputEvent.EVENT_TYPE)
+    @staticmethod
+    def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None:
+        """Process input event and send chat requests for each aspect."""
+        row = _init_row(event)
+        _save_row(ctx, row)
+        for aspect in ASPECTS:
+            ctx.send_event(_build_aspect_request(row["text"], aspect))
+
+    @action(ChatResponseEvent.EVENT_TYPE)
+    @staticmethod
+    def handle_response(event: Event, ctx: RunnerContext) -> None:
+        """Process chat response event and send output event."""
+        parsed = _parse_response(event)

Review Comment:
   The walkthrough shows `handle_response` calling `_parse_response`, but that 
helper and the others (`_init_row`, `_build_aspect_request`, 
`_all_aspects_received`, `_build_summarize_request`, and the Java counterparts) 
aren't shown anywhere on the page. The collection logic that the inline 
questions probe — the aspect-keying and the type-based response dispatch — 
lives entirely in those helpers, so the part a reader most needs to understand 
the parallel pattern is the part that's elided. Would inlining the helper 
bodies (or linking to `parallel_chat_agent.py` / `ParallelChatAgent.java`) help 
the lesson land?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to