This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git
The following commit(s) were added to refs/heads/main by this push:
new 6eaf9ee Add web-ui chat demo (#4)
6eaf9ee is described below
commit 6eaf9eec15ddf63964659264c529578d437e6ed4
Author: Drizzle <[email protected]>
AuthorDate: Tue Dec 9 10:02:03 2025 +0800
Add web-ui chat demo (#4)
* update
Change-Id: I2b5f6364699c1c9e07b6ad381b801c49ce641559
* optimize the code
Change-Id: If2495360c76970ac88387e59e3ac15a6237e2f3e
* update
Change-Id: Iceef7a1bd1d05bfa498694ba7c3a007bbfdf378e
* update
Change-Id: Id0b75d941f02387d7b05a96731ea936526bae012
* add spring web demo
Change-Id: I74483972366bd35fee4a8e6c58b928204b3669ed
* update example
Change-Id: I00ec96acdbf1b7b5aabad5d28b10667b605635e1
* update
Change-Id: I0f51bf41c079d782a1a3adbbd388a32a50170980
* update
Change-Id: Id0e84eb0f4aa59d7aa020b36a7de85a8583f142d
---------
Co-authored-by: drizzle.zk <[email protected]>
---
example/rocketmq-multiagent-base-adk/README.md | 18 +
.../SupervisorAgent-Web/pom.xml | 142 +++
.../org/example/RocketMQA2AChatApplication.java | 27 +
.../src/main/java/org/example/common/TaskInfo.java | 78 ++
.../org/example/controller/A2AChatController.java | 79 ++
.../java/org/example/service/AgentService.java | 457 ++++++++++
.../src/main/resources/application.yml | 16 +
.../src/main/resources/static/index.html | 964 +++++++++++++++++++++
.../src/main/java/agent/AgentExecutorProducer.java | 46 +-
.../src/main/java/agent/AgentExecutorProducer.java | 42 +-
example/rocketmq-multiagent-base-adk/img_2.png | Bin 0 -> 1149856 bytes
example/rocketmq-multiagent-base-adk/img_3.png | Bin 0 -> 1476406 bytes
example/rocketmq-multiagent-base-adk/img_4.png | Bin 0 -> 1512815 bytes
example/rocketmq-multiagent-base-adk/pom.xml | 1 +
14 files changed, 1822 insertions(+), 48 deletions(-)
diff --git a/example/rocketmq-multiagent-base-adk/README.md
b/example/rocketmq-multiagent-base-adk/README.md
index dc72cf8..218f7c0 100644
--- a/example/rocketmq-multiagent-base-adk/README.md
+++ b/example/rocketmq-multiagent-base-adk/README.md
@@ -66,3 +66,21 @@ cd SupervisorAgent/target
java -DrocketMQInstanceID= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar
SupervisorAgent-2.1.1-SNAPSHOT-jar-with-dependencies.jar
```

+
+5.运行SupervisorAgent-Web
+
+```shell
+cd SupervisorAgent-Web/target
+```
+
+```shell
+java -DrocketMQInstanceID= -DworkAgentResponseTopic=WorkerAgentResponse
-DworkAgentResponseGroupID=CID_HOST_AGENT_LITE -DapiKey=
-DweatherAgentTaskTopic=WeatherAgentTask -DtravelAgentTaskTopic=TravelAgentTask
-DrocketMQAK= -DrocketMQSK= -jar SupervisorAgent-Web-1.0.3-SNAPSHOT.jar
+```
+- 打开浏览器,访问 localhost:9090
+- 下面的示例展示了以RocketMQ作为底层Transport过程中实现异步通信以及断点重传功能
+- 咨询杭州明天天气怎么样的过程中,点击中断按钮模拟网络中断,点击重连实现网络重连,数据流恢复重传
+
+
+
+
+
diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
new file mode 100644
index 0000000..18556fc
--- /dev/null
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.2.5</version>
+ <relativePath/>
+ </parent>
+
+ <artifactId>SupervisorAgent-Web</artifactId>
+ <version>1.0.3-SNAPSHOT</version>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
<jakarta.enterprise.cdi-api.version>4.1.0</jakarta.enterprise.cdi-api.version>
+
<jakarta.inject.jakarta.inject-api.version>2.0.1</jakarta.inject.jakarta.inject-api.version>
+ <jakarta.json-api.version>2.1.3</jakarta.json-api.version>
+ <jakarta.ws.rs-api.version>3.1.0</jakarta.ws.rs-api.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <a2a-java-sdk-version>0.3.3.Final</a2a-java-sdk-version>
+ <rocket-client-version>5.1.0</rocket-client-version>
+ <fast-json-version>1.2.83_noneautotype</fast-json-version>
+ <slfj-version>2.0.13</slfj-version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.github.a2asdk</groupId>
+ <artifactId>a2a-java-sdk-client</artifactId>
+ <version>${a2a-java-sdk-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.github.a2asdk</groupId>
+ <artifactId>a2a-java-sdk-reference-jsonrpc</artifactId>
+ <version>${a2a-java-sdk-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.github.a2asdk</groupId>
+ <artifactId>a2a-java-sdk-client</artifactId>
+ <version>${a2a-java-sdk-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client-java</artifactId>
+ <version>${rocket-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fast-json-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.enterprise</groupId>
+ <artifactId>jakarta.enterprise.cdi-api</artifactId>
+ <version>${jakarta.enterprise.cdi-api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.inject</groupId>
+ <artifactId>jakarta.inject-api</artifactId>
+ <version>${jakarta.inject.jakarta.inject-api.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.json</groupId>
+ <artifactId>jakarta.json-api</artifactId>
+ <version>${jakarta.json-api.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ <version>${jakarta.ws.rs-api.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slfj-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.adk</groupId>
+ <artifactId>google-adk</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.example</groupId>
+ <artifactId>Common</artifactId>
+ <version>2.1.1-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-a2a</artifactId>
+ <version>1.0.5</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+
<mainClass>org.example.RocketMQA2AChatApplication</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/RocketMQA2AChatApplication.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/RocketMQA2AChatApplication.java
new file mode 100644
index 0000000..95d7642
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/RocketMQA2AChatApplication.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.example;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class RocketMQA2AChatApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQA2AChatApplication.class, args);
+ }
+}
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/common/TaskInfo.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/common/TaskInfo.java
new file mode 100644
index 0000000..ccd09ff
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/common/TaskInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.example.common;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+
+public class TaskInfo {
+ private String taskId;
+ private String taskDesc;
+ private String userId;
+ private String sessionId;
+ private Sinks.Many<String> sink;
+
+ public TaskInfo(String taskId, String taskDesc, String sessionId, String
userId, Sinks.Many<String> sink) {
+ this.taskId = taskId;
+ this.taskDesc = taskDesc;
+ this.sessionId = sessionId;
+ this.userId = userId;
+ this.sink = sink;
+ }
+
+ public TaskInfo() {
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getTaskDesc() {
+ return taskDesc;
+ }
+
+ public void setTaskDesc(String taskDesc) {
+ this.taskDesc = taskDesc;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public Many<String> getSink() {
+ return sink;
+ }
+
+ public void setSink(Many<String> sink) {
+ this.sink = sink;
+ }
+}
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/controller/A2AChatController.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/controller/A2AChatController.java
new file mode 100644
index 0000000..fba7ec3
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/controller/A2AChatController.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.example.controller;
+
+import org.apache.commons.lang3.StringUtils;
+import org.example.service.AgentService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+
+@RestController
+@RequestMapping("/")
+public class A2AChatController {
+ private static final Logger log =
LoggerFactory.getLogger(A2AChatController.class);
+
+ @Autowired
+ AgentService agentService;
+
+ @GetMapping(value = "/stream", produces =
MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux<String> streamChat(@RequestParam() String question,
@RequestParam() String userId, @RequestParam() String sessionId) {
+ Flux<String> fluxResult = null;
+ try {
+ if (StringUtils.isEmpty(question) || StringUtils.isEmpty(userId)
|| StringUtils.isEmpty(sessionId)) {
+ log.error("streamChat param error, question: {}, userId: {},
sessionId: {}", question, userId, sessionId);
+ return null;
+ }
+ log.info("streamChat question: {}, userId: {}, sessionId: {}",
question, userId, sessionId);
+ fluxResult = agentService.streamChat(userId, sessionId, question);
+ } catch (Exception e) {
+ log.error("streamChat error, question: {}, userId: {}, sessionId:
{}, error: {}", question, userId, sessionId, e.getMessage());
+ }
+ return fluxResult;
+ }
+
+ @GetMapping("/closeStream")
+ public ResponseEntity<String> closeStreamChat(@RequestParam String userId,
@RequestParam String sessionId) {
+ log.info("closeStreamChat userId: {}, sessionId: {}", userId,
sessionId);
+ agentService.closeStreamChat(userId, sessionId);
+ return ResponseEntity.ok("Stream closed successfully");
+ }
+
+ @GetMapping(value = "/resubscribeStream", produces =
MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux<String> resubscribeStreamChat(@RequestParam String userId,
@RequestParam String sessionId) {
+ Flux<String> fluxResult = null;
+ try {
+ if (StringUtils.isEmpty(sessionId) || StringUtils.isEmpty(userId))
{
+ log.error("resubscribeStreamChat param error, userId: {},
sessionId: {}", userId, sessionId);
+ return null;
+ }
+ log.info("resubscribeStreamChat userId: {}, sessionId: {}",
userId, sessionId);
+ fluxResult = agentService.resubscribeStream(userId, sessionId);
+ } catch (Exception e) {
+ log.error("resubscribeStreamChat error, userId: {}, sessionId: {}",
userId, sessionId);
+ }
+ return fluxResult;
+ }
+
+}
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
new file mode 100644
index 0000000..090c648
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.example.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import com.alibaba.fastjson.JSON;
+import autovalue.shaded.com.google.common.collect.ImmutableList;
+import com.google.adk.agents.BaseAgent;
+import com.google.adk.agents.LlmAgent;
+import com.google.adk.artifacts.InMemoryArtifactService;
+import com.google.adk.events.Event;
+import com.google.adk.runner.Runner;
+import com.google.adk.sessions.InMemorySessionService;
+import com.google.adk.sessions.Session;
+import com.google.genai.types.Content;
+import com.google.genai.types.Part;
+import common.Mission;
+import common.QWModel;
+import common.QWModelRegistry;
+import io.a2a.A2A;
+import io.a2a.client.Client;
+import io.a2a.client.ClientEvent;
+import io.a2a.client.TaskUpdateEvent;
+import io.a2a.client.http.A2ACardResolver;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskIdParams;
+import io.a2a.spec.TaskState;
+import io.a2a.spec.TextPart;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Maybe;
+import jakarta.annotation.PostConstruct;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.a2a.common.RocketMQA2AConstant;
+import org.apache.rocketmq.a2a.transport.RocketMQTransport;
+import org.apache.rocketmq.a2a.transport.RocketMQTransportConfig;
+import org.example.common.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+
+@Service
+public class AgentService {
+ private static final Logger log =
LoggerFactory.getLogger(AgentService.class);
+ private static final String AGENT_NAME = "SupervisorAgent";
+ private static final String APP_NAME = "rocketmq_a2a";
+ private static final String WEATHER_AGENT_NAME = "WeatherAgent";
+ private static final String WEATHER_AGENT_URL = "http://localhost:8080";
+ private static final String TRAVEL_AGENT_NAME = "TravelAgent";
+ private static final String TRAVEL_AGENT_URL = "http://localhost:8888";
+ private static final String WORK_AGENT_RESPONSE_TOPIC =
System.getProperty("workAgentResponseTopic");
+ private static final String WORK_AGENT_RESPONSE_GROUP_ID =
System.getProperty("workAgentResponseGroupID");
+ private static final String ROCKETMQ_INSTANCE_ID =
System.getProperty("rocketMQInstanceID");
+ private static final String ACCESS_KEY = System.getProperty("rocketMQAK");
+ private static final String SECRET_KEY = System.getProperty("rocketMQSK");
+ private static final String API_KEY = System.getProperty("apiKey");
+
+ private final Map<String /* agentName */, Client /* agentClient */>
AgentClientMap = new HashMap<>();
+ private final Map<String /* sessionId */, Session /* session */>
sessionMap = new HashMap<>();
+ private final Map<String /* taskId */, TaskInfo /* taskInfo */> taskMap =
new HashMap<>();
+ private final Map<String /* userId */, Map<String /* sessionId */,
List<TaskInfo> /* taskInfo */>> userSessionTaskListMap = new HashMap<>();
+
+ private InMemorySessionService sessionService;
+ private Runner runner;
+ private String lastQuestion = "";
+ @PostConstruct
+ public void init() {
+ if (!checkConfigParam()) {
+ log.error("please check the config param");
+ throw new RuntimeException("please check the config param");
+ }
+ BaseAgent baseAgent = initAgent(WEATHER_AGENT_NAME, TRAVEL_AGENT_NAME);
+ printSystemInfo("🚀 启动 QWen为底座模型的 " + AGENT_NAME +
",擅长处理天气问题与行程安排规划问题,在本例中使用RocketMQ LiteTopic版本实现多个Agent之间的通讯");
+ InMemoryArtifactService artifactService = new
InMemoryArtifactService();
+ sessionService = new InMemorySessionService();
+ runner = new Runner(baseAgent, APP_NAME, artifactService,
sessionService, /* memoryService= */ null);
+ initAgentCardInfo(ACCESS_KEY, SECRET_KEY, WEATHER_AGENT_NAME,
WEATHER_AGENT_URL);
+ initAgentCardInfo(ACCESS_KEY, SECRET_KEY, TRAVEL_AGENT_NAME,
TRAVEL_AGENT_URL);
+ }
+
+ private static boolean checkConfigParam() {
+ if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC) ||
StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID) ||
StringUtils.isEmpty(ACCESS_KEY) || StringUtils.isEmpty(SECRET_KEY) ||
StringUtils.isEmpty(API_KEY)) {
+ if (StringUtils.isEmpty(ROCKETMQ_INSTANCE_ID)) {
+ log.error("请配置RocketMQ 的实例信息 rocketMQInstanceID");
+ }
+ if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_TOPIC)) {
+ log.error("请配置RocketMQ 的轻量消息Topic workAgentResponseTopic");
+ }
+ if (StringUtils.isEmpty(WORK_AGENT_RESPONSE_GROUP_ID)) {
+ log.error("请配置RocketMQ 的轻量消息消费者 workAgentResponseGroupID");
+ }
+ if (StringUtils.isEmpty(ACCESS_KEY)) {
+ log.error("请配置RocketMQ 的访问控制-用户名 rocketMQAK");
+ }
+ if (StringUtils.isEmpty(SECRET_KEY)) {
+ log.error("请配置RocketMQ 的访问控制-密码 rocketMQSK");
+ }
+ if (StringUtils.isEmpty(API_KEY)) {
+ log.error("请配置SupervisorAgent qwen-plus apiKey");
+ }
+ return false;
+ }
+ return true;
+ }
+
+ public Flux<String> streamChat(String userId, String sessionId, String
question) {
+ Session userSession = sessionMap.computeIfAbsent(sessionId, k -> {
+ return runner.sessionService().createSession(APP_NAME, userId,
null, sessionId).blockingGet();
+ });
+ Map<String, List<TaskInfo>> sessionTaskListMap =
userSessionTaskListMap.computeIfAbsent(userId, k -> new HashMap<>());
+ List<TaskInfo> taskList =
sessionTaskListMap.computeIfAbsent(sessionId, k -> new ArrayList<>());
+ Content userMsg = Content.fromParts(Part.fromText(question));
+ Flowable<Event> events = runner.runAsync(userId, userSession.id(),
userMsg);
+ Sinks.Many<String> sink =
Sinks.many().multicast().onBackpressureBuffer();
+ events.blockingForEach(event -> {
+ String content = event.stringifyContent();
+ dealEventContent(content, sink, taskList, userId, sessionId);
+ });
+ return Flux.from(sink.asFlux());
+ }
+
+ public void closeStreamChat(String userId, String sessionId) {
+ Map<String, List<TaskInfo>> sessionTaskListMap =
userSessionTaskListMap.computeIfAbsent(userId, k -> new HashMap<>());
+ List<TaskInfo> taskInfos =
sessionTaskListMap.computeIfAbsent(sessionId, k -> new ArrayList<>());
+ for (TaskInfo taskInfo : taskInfos) {
+ taskInfo.getSink().emitError(new RuntimeException("用户断开连接"),
Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+ Collection<Client> clients = AgentClientMap.values();
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put(RocketMQA2AConstant.CLOSE_LITE_TOPIC, sessionId);
+ if (!CollectionUtils.isEmpty(clients)) {
+ for (Client client : clients) {
+ client.resubscribe(new TaskIdParams("", metadata));
+ log.info("closeStream userId: {}, sessionId: {}", userId,
sessionId);
+ }
+ }
+ }
+
+ public Flux<String> resubscribeStream(String userId, String sessionId) {
+ try {
+ Map<String, List<TaskInfo>> sessionTaskList =
userSessionTaskListMap.computeIfAbsent(userId, k -> new HashMap<>());
+ List<TaskInfo> taskInfoList =
sessionTaskList.computeIfAbsent(sessionId, k -> new ArrayList<>());
+ Sinks.Many<String> sink =
Sinks.many().multicast().onBackpressureBuffer();
+ if (CollectionUtils.isEmpty(taskInfoList)) {
+ return Flux.just("任务均已完成,请重新提问");
+ }
+ for (TaskInfo taskInfo : taskInfoList) {
+ taskInfo.setSink(sink);
+ }
+ Collection<Client> clients = AgentClientMap.values();
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put(RocketMQA2AConstant.LITE_TOPIC, sessionId);
+ if (!CollectionUtils.isEmpty(clients)) {
+ for (Client client : clients) {
+ try {
+ client.resubscribe(new TaskIdParams("", metadata));
+ } catch (Exception e) {
+ log.error("resubscribeStream client.resubscribe
error, userId: {}, sessionId: {}, error: {}", userId, sessionId,
e.getMessage());
+ }
+ }
+ }
+ return Flux.from(sink.asFlux());
+ } catch (Exception e) {
+ log.error("resubscribeStream error, userId: {}, sessionId: {},
error: {}", userId, sessionId, e.getMessage());
+ }
+ return null;
+ }
+
+ private void dealEventContent(String content, Sinks.Many<String> sink,
List<TaskInfo> taskList, String userId, String sessionId) {
+ if (StringUtils.isEmpty(content) || null == sink ||
StringUtils.isEmpty(userId) || StringUtils.isEmpty(sessionId)) {
+ return;
+ }
+ String taskId = UUID.randomUUID().toString();
+ if (content.startsWith("{")) {
+ try {
+ Mission mission = JSON.parseObject(content, Mission.class);
+ if (null != mission) {
+ TaskInfo taskInfo = taskMap.computeIfAbsent(taskId, k ->
{return new TaskInfo(taskId, mission.getMessageInfo(), sessionId, userId,
sink);});
+ if (null != taskList) {
+ taskList.add(taskInfo);
+ }
+ log.info("转发请求到其他的Agent, 等待其响应,Agent: {}, 问题: {}",
mission.getAgent(), mission.getMessageInfo());
+ emitMessage(sink, "******" + AGENT_NAME + "转发请求到其他的Agent,
等待其响应,Agent: " + mission.getAgent() + ",问题: " + mission.getMessageInfo(),
false);
+ dealMissionByMessage(mission, taskId, sessionId);
+ }
+ } catch (Exception e) {
+ log.error("解析过程出现异常, " + e.getMessage());
+ }
+ } else {
+ emitMessage(sink, content, true);
+ }
+ }
+
+ private void dealMissionByMessage(Mission mission, String taskId, String
sessionId) {
+ if (null == mission || StringUtils.isEmpty(mission.getAgent()) ||
StringUtils.isEmpty(mission.getMessageInfo()) || StringUtils.isEmpty(taskId) ||
StringUtils.isEmpty(sessionId)) {
+ log.error("dealMissionByMessage param error, mission: {}, taskId:
{}, sessionId: {}", JSON.toJSONString(mission), taskId, sessionId);
+ return;
+ }
+ try {
+ String agentName = mission.getAgent().replaceAll(" ", "");
+ Client client = AgentClientMap.get(agentName);
+
client.sendMessage(A2A.createUserTextMessage(mission.getMessageInfo(),
sessionId, taskId));
+ log.info("dealMissionByMessage message: {}",
mission.getMessageInfo());
+ } catch (Exception e) {
+ log.error("dealMissionByMessage error, mission: {}, taskId: {},
sessionId: {}, error: {}", JSON.toJSONString(mission), taskId, sessionId,
e.getMessage());
+ }
+ }
+
+ public BaseAgent initAgent(String weatherAgent, String travelAgent) {
+ if (StringUtils.isEmpty(weatherAgent) ||
StringUtils.isEmpty(travelAgent)) {
+ log.error("initAgent 参数缺失,请补充天气助手weatherAgent、行程安排助手travelAgent");
+ return null;
+ }
+ QWModel qwModel = QWModelRegistry.getModel(API_KEY);
+ return LlmAgent.builder()
+ .name(APP_NAME)
+ .model(qwModel)
+ .description("你是一位专业的行程规划专家")
+ .instruction("# 角色\n"
+ +
"你是一位专业的行程规划专家,擅长任务分解与协调安排。你的主要职责是帮助用户制定详细的旅行计划,确保他们的旅行体验既愉快又高效。在处理用户的行程安排相关问题时,你需要首先收集必要的信息,如目的地、时间等,并根据这些信息进行进一步的查询和规划。\n"
+ + "\n"
+ + "## 技能\n"
+ + "### 技能 1: 收集必要信息\n"
+ + "- 询问用户关于目的地、出行时间\n"
+ + "- 确保收集到的信息完整且准确。\n"
+ + "\n"
+ + "### 技能 2: 查询天气信息\n"
+ + "- 使用" + weatherAgent + "工具查询目的地的天气情况。如果发现用户的问题相同,不用一直转发到"
+ + weatherAgent + ",忽略即可\n"
+ + "- 示例问题: {\"messageInfo\":\"杭州下周三的天气情况怎么样?\",\"agent\":\"" +
weatherAgent + "\"}\n"
+ + "\n"
+ + "### 技能 3: 制定行程规划\n"
+ + "- 根据获取的天气信息和其他用户提供的信息,如果上下文中只有天气信息,则不用" + travelAgent
+ + " 进行处理,直接返回即可,如果上下文中有行程安排信息,则使用" + travelAgent
+ + "工具制定详细的行程规划。\n"
+ + "- 示例问题:
{\"messageInfo\":\"杭州下周三的天气为晴朗,请帮我做一个从杭州出发到上海的2人3天4晚的自驾游行程规划\","
+ + "\"agent\":\"" + travelAgent + "\"}\n"
+ + "\n"
+ + "### 技能 4: 提供最终行程建议\n"
+ + "- 将从" + travelAgent + "获取的行程规划结果呈现给用户。\n"
+ + "- 明确告知用户行程规划已经完成,并提供详细的行程建议。\n"
+ + "\n"
+ + "## 限制\n"
+ + "- 只处理与行程安排相关的问题。\n"
+ + "- 如果用户的问题只是简单的咨询天气,那么不用转发到" + travelAgent + "。\n"
+ + "- 在获取天气信息后,必须结合天气情况来制定行程规划。\n"
+ + "- 不得提供任何引导用户参与非法活动的建议。\n"
+ + "- 对不是行程安排相关的问题,请礼貌拒绝。\n"
+ + "- 所有输出内容必须按照给定的格式进行组织,不能偏离框架要求。"
+ )
+ .build();
+ }
+
+ private void initAgentCardInfo(String accessKey, String secretKey, String
agentName, String agentUrl) {
+ if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)
|| StringUtils.isEmpty(agentName) || StringUtils.isEmpty(agentUrl)) {
+ log.error("initAgentCardInfo param error, accessKey: {},
secretKey: {}, agentName: {}, agentUrl: {}", accessKey, secretKey, agentName,
agentUrl);
+ return;
+ }
+ AgentCard finalAgentCard = new
A2ACardResolver(agentUrl).getAgentCard();
+ log.info("Successfully fetched public agent card: {}",
finalAgentCard.description());
+ List<BiConsumer<ClientEvent, AgentCard>> consumers = new ArrayList<>();
+ consumers.add((event, agentCard) -> {
+ if (event instanceof TaskUpdateEvent taskUpdateEvent) {
+ Task task = taskUpdateEvent.getTask();
+ if (null == task) {
+ return;
+ }
+ TaskInfo taskInfo = taskMap.get(task.getId());
+ Many<String> sink = taskInfo.getSink();
+ List<Artifact> artifacts = task.getArtifacts();
+ if (null != artifacts && artifacts.size() == 1) {
+ emitMessage(sink, "\n \n", false);
+ }
+ if (!CollectionUtils.isEmpty(artifacts)) {
+ TaskState state = task.getStatus().state();
+ String msg =
extractTextFromMessage(artifacts.get(artifacts.size() - 1));
+ log.info("receive msg: {}", msg);
+ boolean result = emitMessage(sink, msg, false);
+ if (!result) {
+ throw new RuntimeException("client close stream");
+ }
+ if (state == TaskState.COMPLETED) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (Artifact tempArtifact : artifacts) {
+
stringBuilder.append(extractTextFromMessage(tempArtifact));
+ }
+ dealAgentResponse(stringBuilder.toString(),
taskInfo.getUserId(), taskInfo.getSessionId(), taskInfo.getTaskId());
+ }
+ }
+ }
+ });
+
+ Consumer<Throwable> streamingErrorHandler = (error) -> {
+ log.error("Streaming error: {}", error.getMessage());
+ };
+ //config rocketmq info
+ RocketMQTransportConfig rocketMQTransportConfig = new
RocketMQTransportConfig();
+ rocketMQTransportConfig.setRocketMQInstanceID(ROCKETMQ_INSTANCE_ID);
+ rocketMQTransportConfig.setAccessKey(accessKey);
+ rocketMQTransportConfig.setSecretKey(secretKey);
+
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
+
rocketMQTransportConfig.setWorkAgentResponseTopic(WORK_AGENT_RESPONSE_TOPIC);
+ Client client = Client.builder(finalAgentCard)
+ .addConsumers(consumers)
+ .streamingErrorHandler(streamingErrorHandler)
+ .withTransport(RocketMQTransport.class, rocketMQTransportConfig)
+ .build();
+ AgentClientMap.put(agentName, client);
+ log.info("init success");
+ }
+
+ private static String extractTextFromMessage(Artifact artifact) {
+ if (null == artifact) {
+ return "";
+ }
+ List<io.a2a.spec.Part<?>> parts = artifact.parts();
+ if (CollectionUtils.isEmpty(parts)) {
+ return "";
+ }
+ StringBuilder textBuilder = new StringBuilder();
+ for (io.a2a.spec.Part part : parts) {
+ if (part instanceof TextPart textPart) {
+ textBuilder.append(textPart.getText());
+ }
+ }
+ return textBuilder.toString();
+ }
+
+ private void dealAgentResponse(String result, String userId, String
sessionId, String taskId) {
+ if (StringUtils.isEmpty(result)) {
+ return;
+ }
+ Maybe<Session> sessionMaybe = sessionService.getSession(APP_NAME,
userId, sessionId, Optional.empty());
+ Event event = Event.builder()
+ .id(UUID.randomUUID().toString())
+ .invocationId(UUID.randomUUID().toString())
+ .author(APP_NAME)
+ .content(buildContent(result))
+ .build();
+ Session session = sessionMaybe.blockingGet();
+ sessionService.appendEvent(session, event);
+ Content userMsg = Content.fromParts(Part.fromText(result));
+ Flowable<Event> events = runner.runAsync(userId, sessionId, userMsg);
+ events.blockingForEach(eventSub -> {
+ boolean equals = lastQuestion.equals(eventSub.stringifyContent());
+ if (equals) {
+ return;
+ }
+ lastQuestion = eventSub.stringifyContent();
+ String content = lastQuestion;
+ TaskInfo taskInfo = taskMap.get(taskId);
+ Many<String> sink = taskInfo.getSink();
+ if (!StringUtils.isEmpty(content)) {
+ if (content.startsWith("{")) {
+ try {
+ Mission mission = JSON.parseObject(content,
Mission.class);
+ if (null != mission &&
!StringUtils.isEmpty(mission.getMessageInfo()) &&
!StringUtils.isEmpty(mission.getAgent())) {
+ log.info("转发到其他的Agent, 等待其他Agent响应,Agent: {}, 问题:
{}", mission.getAgent(), mission.getMessageInfo());
+ emitMessage(sink,"\n \n ******" + AGENT_NAME + "
转发请求到其他的Agent, 等待其响应,Agent: " + mission.getAgent() + ", 问题: " +
mission.getMessageInfo(), false);
+ dealMissionByMessage(mission, taskId, sessionId);
+ }
+ } catch (Exception e) {
+ log.error("parse result error: {}", e.getMessage());
+ }
+ } else {
+ sink.tryEmitComplete();
+ completeTask(taskInfo);
+ }
+ }
+ });
+ }
+
+ /**
+ * 对Task相关的资源进行清理
+ * @param taskInfo
+ */
+ private void completeTask(TaskInfo taskInfo) {
+ if (null == taskInfo || StringUtils.isEmpty(taskInfo.getTaskId())) {
+ log.error("completeTask taskInfo is null or taskId is empty");
+ return;
+ }
+ String taskId = taskInfo.getTaskId();
+ taskMap.remove(taskId);
+ log.info("completeTask taskMap clear success taskId: {}", taskId);
+ Map<String, List<TaskInfo>> sessionTaskListMap =
userSessionTaskListMap.get(taskInfo.getUserId());
+ if (null != sessionTaskListMap) {
+ List<TaskInfo> taskInfos =
sessionTaskListMap.get(taskInfo.getSessionId());
+ if (CollectionUtils.isEmpty(taskInfos)) {
+ return;
+ }
+ boolean result = taskInfos.removeIf(next ->
next.getTaskId().equals(taskId));
+ log.info("completeTask userSessionTaskListMap clear success,
taskId: {}, result: {}", taskId, result);
+ }
+ }
+
+ private static Content buildContent(String content) {
+ if (StringUtils.isEmpty(content)) {
+ return null;
+ }
+ return Content.builder()
+ .role(APP_NAME)
+ .parts(ImmutableList.of(Part.builder().text(content).build()))
+ .build();
+ }
+
+ private static void printSystemInfo(String message) {
+ System.out.println("\u001B[34m[SYSTEM] " + message + "\u001B[0m");
+ }
+
+ private static boolean emitMessage(Sinks.Many<String> sink, String msg,
boolean isFinish) {
+ Sinks.EmitResult result = sink.tryEmitNext(msg);
+ switch (result) {
+ case OK:
+ log.info("📤 成功发送: {}", msg);
+ break;
+ case FAIL_OVERFLOW:
+ case FAIL_CANCELLED:
+ case FAIL_TERMINATED:
+ log.error("🛑 上游检测到问题,停止发送。原因: {}", result);
+ return false;
+ default:
+ log.error("⚠️ 发送状态: {}", result);
+ }
+ if (isFinish) {
+ sink.tryEmitComplete();
+ }
+ return true;
+ }
+
+}
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/application.yml
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/application.yml
new file mode 100644
index 0000000..78f5b08
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/application.yml
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+ port: 9090 # 修改为你想要的端口,比如 9090、8081、80 等
diff --git
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
new file mode 100644
index 0000000..cb4f9a4
--- /dev/null
+++
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/resources/static/index.html
@@ -0,0 +1,964 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE html>
+<html lang="zh-CN">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>RocketMQ A2A </title>
+ <!-- 引入marked.js用于Markdown渲染 -->
+ <script
src="https://cdn.jsdelivr.net/npm/[email protected]/marked.min.js"></script>
+ <!-- 引入highlight.js用于代码高亮 -->
+ <link rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/[email protected]/styles/github.min.css">
+ <script
src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>
+ <style>
+ * {
+ margin: 0;
+ padding: 0;
+ box-sizing: border-box;
+ }
+
+ body {
+ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI',
'Roboto', 'Helvetica Neue', Arial, sans-serif;
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ min-height: 100vh;
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ padding: 20px;
+ }
+
+ .chat-container {
+ background: white;
+ border-radius: 20px;
+ box-shadow: 0 20px 40px rgba(0, 0, 0, 0.1);
+ width: 100%;
+ max-width: 1400px;
+ height: 800px;
+ display: flex;
+ flex-direction: column;
+ overflow: hidden;
+ }
+
+ .chat-header {
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ color: white;
+ padding: 20px;
+ text-align: center;
+ font-size: 24px;
+ font-weight: 600;
+ position: relative;
+ }
+
+ .header-controls {
+ position: absolute;
+ top: 50%;
+ right: 20px;
+ transform: translateY(-50%);
+ display: flex;
+ gap: 10px;
+ align-items: center;
+ }
+
+ .header-input {
+ padding: 5px 10px;
+ border-radius: 4px;
+ border: 1px solid #fff;
+ background: rgba(255, 255, 255, 0.9);
+ width: 120px;
+ font-size: 14px;
+ }
+
+ .header-button {
+ padding: 5px 10px;
+ border-radius: 4px;
+ border: 1px solid #fff;
+ background: rgba(255, 255, 255, 0.9);
+ color: #764ba2;
+ font-size: 14px;
+ cursor: pointer;
+ }
+
+ .header-button:hover {
+ background: rgba(255, 255, 255, 1);
+ }
+
+ .header-button:disabled {
+ opacity: 0.6;
+ cursor: not-allowed;
+ }
+
+ .chat-messages {
+ flex: 1;
+ padding: 20px;
+ overflow-y: auto;
+ background: #f8f9fa;
+ display: flex;
+ flex-direction: column;
+ gap: 15px;
+ }
+
+ .message {
+ max-width: 80%;
+ padding: 12px 16px;
+ border-radius: 18px;
+ word-wrap: break-word;
+ line-height: 1.4;
+ }
+
+ .user-message {
+ background: #007bff;
+ color: white;
+ align-self: flex-end;
+ border-bottom-right-radius: 4px;
+ }
+
+ .bot-message {
+ background: white;
+ color: #333;
+ align-self: flex-start;
+ border: 1px solid #e9ecef;
+ border-bottom-left-radius: 4px;
+ box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
+ }
+
+ .typing-indicator {
+ background: white;
+ color: #666;
+ align-self: flex-start;
+ border: 1px solid #e9ecef;
+ border-bottom-left-radius: 4px;
+ box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
+ display: none;
+ }
+
+ .typing-dots {
+ display: inline-block;
+ animation: typing 1.4s infinite;
+ }
+
+ @keyframes typing {
+ 0%, 60%, 100% { opacity: 0.3; }
+ 30% { opacity: 1; }
+ }
+
+ .chat-input-container {
+ padding: 20px;
+ background: white;
+ border-top: 1px solid #e9ecef;
+ display: flex;
+ gap: 10px;
+ }
+
+ .chat-input {
+ flex: 1;
+ padding: 12px 16px;
+ border: 2px solid #e9ecef;
+ border-radius: 25px;
+ font-size: 16px;
+ outline: none;
+ transition: border-color 0.3s ease;
+ }
+
+ .chat-input:focus {
+ border-color: #007bff;
+ }
+
+ .send-button {
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ color: white;
+ border: none;
+ border-radius: 50%;
+ width: 50px;
+ height: 50px;
+ cursor: pointer;
+ display: flex;
+ align-items: center;
+ justify-content: center;
+ font-size: 18px;
+ transition: transform 0.2s ease;
+ }
+
+ .send-button:hover {
+ transform: scale(1.05);
+ }
+
+ .send-button:disabled {
+ opacity: 0.6;
+ cursor: not-allowed;
+ transform: none;
+ }
+
+ .error-message {
+ background: #f8d7da;
+ color: #721c24;
+ border: 1px solid #f5c6cb;
+ border-radius: 18px;
+ padding: 12px 16px;
+ margin: 10px 0;
+ text-align: center;
+ }
+
+ .clear-button {
+ background: #6c757d;
+ color: white;
+ border: none;
+ border-radius: 20px;
+ padding: 8px 16px;
+ cursor: pointer;
+ font-size: 14px;
+ margin-left: 10px;
+ }
+
+ .clear-button:hover {
+ background: #5a6268;
+ }
+
+ /* Markdown样式 */
+ .bot-message h1, .bot-message h2, .bot-message h3, .bot-message h4,
.bot-message h5, .bot-message h6 {
+ margin: 16px 0 8px 0;
+ font-weight: 600;
+ line-height: 1.25;
+ }
+
+ .bot-message h1 { font-size: 1.5em; border-bottom: 1px solid #eaecef;
padding-bottom: 8px; }
+ .bot-message h2 { font-size: 1.3em; border-bottom: 1px solid #eaecef;
padding-bottom: 6px; }
+ .bot-message h3 { font-size: 1.1em; }
+ .bot-message h4 { font-size: 1em; }
+ .bot-message h5 { font-size: 0.9em; }
+ .bot-message h6 { font-size: 0.8em; color: #6a737d; }
+
+ .bot-message p {
+ margin: 8px 0;
+ line-height: 1.6;
+ }
+
+ .bot-message ul, .bot-message ol {
+ margin: 8px 0;
+ padding-left: 24px;
+ }
+
+ .bot-message li {
+ margin: 4px 0;
+ line-height: 1.6;
+ }
+
+ .bot-message blockquote {
+ margin: 16px 0;
+ padding: 0 16px;
+ color: #6a737d;
+ border-left: 4px solid #dfe2e5;
+ background: #f6f8fa;
+ border-radius: 0 4px 4px 0;
+ }
+
+ .bot-message code {
+ background: #f6f8fa;
+ border-radius: 3px;
+ font-size: 85%;
+ margin: 0;
+ padding: 2px 4px;
+ font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo,
monospace;
+ }
+
+ .bot-message pre {
+ background: #f6f8fa;
+ border-radius: 6px;
+ font-size: 85%;
+ line-height: 1.45;
+ overflow: auto;
+ padding: 16px;
+ margin: 16px 0;
+ }
+
+ .bot-message pre code {
+ background: transparent;
+ border: 0;
+ display: inline;
+ line-height: inherit;
+ margin: 0;
+ max-width: auto;
+ overflow: visible;
+ padding: 0;
+ word-wrap: normal;
+ }
+
+ .bot-message table {
+ border-collapse: collapse;
+ border-spacing: 0;
+ margin: 16px 0;
+ width: 100%;
+ }
+
+ .bot-message table th,
+ .bot-message table td {
+ border: 1px solid #dfe2e5;
+ padding: 6px 13px;
+ text-align: left;
+ }
+
+ .bot-message table th {
+ background: #f6f8fa;
+ font-weight: 600;
+ }
+
+ .bot-message table tr:nth-child(2n) {
+ background: #f6f8fa;
+ }
+
+ .bot-message hr {
+ border: 0;
+ border-top: 1px solid #eaecef;
+ margin: 24px 0;
+ }
+
+ .bot-message a {
+ color: #0366d6;
+ text-decoration: none;
+ }
+
+ .bot-message a:hover {
+ text-decoration: underline;
+ }
+
+ .bot-message strong {
+ font-weight: 600;
+ }
+
+ .bot-message em {
+ font-style: italic;
+ }
+
+ .bot-message img {
+ max-width: 100%;
+ height: auto;
+ border-radius: 6px;
+ margin: 8px 0;
+ }
+ /* === 新增的 Toast 样式 === */
+ .toast-container {
+ position: fixed;
+ top: 0;
+ left: 0;
+ width: 100%;
+ height: 100%;
+ pointer-events: none; /* 允许点击穿透 */
+ z-index: 9999;
+ display: flex;
+ justify-content: center;
+ align-items: flex-start; /* 从顶部开始,避免遮挡 */
+ padding-top: 20vh; /* 距离顶部 20% 视口高度,避免太居中影响操作 */
+ }
+
+ .toast {
+ background: rgba(0, 0, 0, 0.85);
+ color: white;
+ padding: 16px 24px;
+ border-radius: 12px;
+ box-shadow: 0 8px 24px rgba(0, 0, 0, 0.3);
+ transform: translateY(-20px);
+ animation: slideInCenter 0.3s forwards, fadeOutCenter 0.3s 2.7s
forwards;
+ opacity: 0;
+ max-width: 80%;
+ text-align: center;
+ pointer-events: auto; /* 恢复点击事件 */
+ font-size: 16px;
+ line-height: 1.4;
+ }
+
+ @keyframes slideInCenter {
+ to {
+ transform: translateY(0);
+ opacity: 1;
+ }
+ }
+
+ @keyframes fadeOutCenter {
+ to {
+ opacity: 0;
+ transform: translateY(-20px);
+ }
+ }
+
+ /* 成功/警告/错误样式 */
+ .toast.success { background: rgba(40, 167, 69, 0.95); }
+ .toast.warning { background: rgba(255, 193, 7, 0.95); color: #212529; }
+ .toast.error { background: rgba(220, 53, 69, 0.95); }
+
+ </style>
+</head>
+<body>
+<div class="chat-container">
+ <div class="chat-header">
+ 🤖 RocketMQ A2A
+ <div class="header-controls">
+ <input type="text" id="userIdInput" class="header-input"
placeholder="UserId" value="rocketmq">
+ <input type="text" id="sessionIdInput" class="header-input"
placeholder="SessionId" value="1">
+ <button id="confirmButton" class="header-button">登录</button>
+ </div>
+ </div>
+
+ <div class="chat-messages" id="chatMessages">
+ <div class="message bot-message">
+ 您好!有什么可以帮助您的吗?擅长处理天气问题与行程安排规划问题,在本例中使用RocketMQ
LiteTopic版本实现多个Agent之间的通讯<br>
+ 询问天气: '杭州明天的天气情况怎么样'<br>
+ 帮忙安排行程: '帮我做一个明天杭州周边自驾游方案'
+ </div>
+ </div>
+
+
+ <div class="typing-indicator" id="typingIndicator">
+ <span class="typing-dots">正在思考中...</span>
+ </div>
+
+ <div class="chat-input-container">
+ <input
+ type="text"
+ id="chatInput"
+ class="chat-input"
+ placeholder="输入你的问题..."
+ autocomplete="off"
+ >
+ <button id="sendButton" class="send-button">➤</button>
+ <button id="clearButton" class="clear-button">清空</button>
+ <button id="close-Button" class="send-button">断开</button>
+ <button id="resubscribe-Button" class="send-button">重连</button>
+ <!-- <button id="testMarkdownButton"
class="clear-button">测试Markdown</button>-->
+ </div>
+ <div class="toast-container" id="toastContainer"></div>
+</div>
+
+<script>
+ class ObjectListStorage {
+ constructor(key) {
+ this.key = key;
+ }
+
+ // 获取当前列表
+ get() {
+ try {
+ const data = localStorage.getItem(this.key);
+ return data ? JSON.parse(data) : [];
+ } catch (error) {
+ console.error('读取失败,返回空数组:', error);
+ return [];
+ }
+ }
+
+ // 添加单个消息对象
+ addMessage(userType, msg, isEnd = false) {
+ // 验证参数类型
+ if (typeof userType !== 'string') {
+ throw new Error('userType 必须是字符串');
+ }
+ if (typeof msg !== 'string') {
+ throw new Error('msg 必须是字符串');
+ }
+ if (typeof isEnd !== 'boolean') {
+ throw new Error('isEnd 必须是布尔值');
+ }
+
+ const message = { userType, msg, isEnd };
+ const list = this.get();
+ list.push(message);
+ this.save(list);
+ }
+
+ // 添加自定义对象(更灵活)
+ addObject(obj) {
+ // 验证必需属性
+ if (!obj || typeof obj !== 'object') {
+ throw new Error('参数必须是对象');
+ }
+ if (typeof obj.userType !== 'string') {
+ throw new Error('对象必须包含字符串类型的 userType 属性');
+ }
+ if (typeof obj.msg !== 'string') {
+ throw new Error('对象必须包含字符串类型的 msg 属性');
+ }
+ if (typeof obj.isEnd !== 'boolean') {
+ throw new Error('对象必须包含布尔类型的 isEnd 属性');
+ }
+
+ const list = this.get();
+ list.push(obj);
+ this.save(list);
+ }
+
+ // 批量添加消息对象
+ addMessages(messages) {
+ if (!Array.isArray(messages)) {
+ throw new Error('参数必须是数组');
+ }
+
+ const validatedMessages = messages.map(msg => {
+ if (typeof msg.userType !== 'string' ||
+ typeof msg.msg !== 'string' ||
+ typeof msg.isEnd !== 'boolean') {
+ throw new Error('数组中的每个对象都必须包含 userType(string),
msg(string), isEnd(boolean)');
+ }
+ return msg;
+ });
+
+ const list = this.get();
+ list.push(...validatedMessages);
+ this.save(list);
+ }
+
+ // 删除元素(根据条件删除)
+ removeByCondition(conditionFn) {
+ const list = this.get().filter(item => !conditionFn(item));
+ this.save(list);
+ }
+
+ // 删除最后一条消息
+ removeLast() {
+ const list = this.get();
+ if (list.length > 0) {
+ list.pop();
+ this.save(list);
+ }
+ }
+
+ // 清空列表
+ clear() {
+ localStorage.removeItem(this.key);
+ }
+
+ // 保存到 localStorage
+ save(list) {
+ try {
+ localStorage.setItem(this.key, JSON.stringify(list));
+ } catch (error) {
+ console.error('存储失败:', error);
+ throw error;
+ }
+ }
+ }
+
+ // 获取默认值
+ const defaultUserId = "rocketmq";
+ const defaultSessionId = 1;
+ const defaultKey = defaultUserId +"@"+ defaultSessionId;
+ let history = new ObjectListStorage(defaultKey);
+ let showHistory = false;
+ let sendMessageError = false;
+
+ class ChatBot {
+ constructor() {
+ this.chatMessages = document.getElementById('chatMessages');
+ this.chatInput = document.getElementById('chatInput');
+ this.sendButton = document.getElementById('sendButton');
+ this.clearButton = document.getElementById('clearButton');
+ this.resubscribeButton =
document.getElementById('resubscribe-Button');
+ this.closeButton = document.getElementById('close-Button');
+ // this.testMarkdownButton =
document.getElementById('testMarkdownButton');
+ this.typingIndicator = document.getElementById('typingIndicator');
+
+ // 新增的元素
+ this.userIdInput = document.getElementById('userIdInput');
+ this.sessionIdInput = document.getElementById('sessionIdInput');
+ this.confirmButton = document.getElementById('confirmButton');
+
+ this.isStreaming = false;
+ this.currentBotMessage = null;
+ this.markdownBuffer = '';
+
+ // 配置marked选项
+ this.configureMarked();
+
+ this.initEventListeners();
+ }
+
+ configureMarked() {
+ // 配置marked选项
+ marked.setOptions({
+ highlight: function(code, lang) {
+ if (lang && hljs.getLanguage(lang)) {
+ try {
+ return hljs.highlight(code, { language: lang
}).value;
+ } catch (err) {}
+ }
+ try {
+ return hljs.highlightAuto(code).value;
+ } catch (err) {}
+ return code;
+ },
+ breaks: true, // 将单个换行符转换为<br>
+ gfm: true, // 启用GitHub风格Markdown
+ pedantic: false,
+ sanitize: false,
+ smartLists: true,
+ smartypants: false
+ });
+ }
+
+ initEventListeners() {
+ this.sendButton.addEventListener('click', () =>
this.sendMessage());
+ this.resubscribeButton.addEventListener('click', () =>
this.resubscribeMessage());
+ this.closeButton.addEventListener('click', () =>
this.closeStream());
+ this.clearButton.addEventListener('click', () => this.clearChat());
+ // this.testMarkdownButton.addEventListener('click', () =>
this.testMarkdown());
+ this.confirmButton.addEventListener('click', () =>
this.handleConfirm());
+ this.chatInput.addEventListener('keypress', (e) => {
+ if (e.key === 'Enter' && !e.shiftKey) {
+ e.preventDefault();
+ this.sendMessage();
+ }
+ });
+ }
+
+ async sendMessage() {
+ showHistory = true;
+ const message = this.chatInput.value.trim();
+ if (this.isStreaming) {
+ showToast("上次的任务正在处理中");
+ return;
+ }
+ if (!message.trim()) {
+ showToast("请不要输入空值");
+ return;
+ }
+
+ // 添加用户消息
+ this.addMessage(message, 'user');
+ this.chatInput.value = '';
+ this.setLoading(false);
+ //添加用户问题
+ // history.add(message);
+ history.addMessage('user', message, true);
+ try {
+ await this.streamResponse(message);
+ } catch (error) {
+ console.error('Error:', error);
+ // this.addMessage('抱歉,发生了错误,请稍后重试。', 'bot');
+ } finally {
+ this.setLoading(false);
+ }
+ }
+
+ async resubscribeMessage() {
+ // 获取当前输入框的值
+ const userId = this.userIdInput.value || "rocketmq";
+ const sessionId = this.sessionIdInput.value || "1";
+ history = new ObjectListStorage(userId + "@" + sessionId);
+ const messages = history.get();
+ if (messages.length === 0) {
+ return;
+ }
+ // 遍历所有消息对象
+ let str = "";
+ let lastBotNotFinish = false;
+ // if (!showHistory) {
+ // showHistory = true;
+ // messages.forEach((message, index) => {
+ // // 条件1: 判断用户类型
+ // if (message.userType === 'user') {
+ // if (lastBotNotFinish) {
+ // this.addMessage(str, 'bot');
+ // str = "";
+ // }
+ // this.addMessage(message.msg, 'user');
+ // } else if (message.userType === 'bot' && message.isEnd)
{
+ // str += message.msg;
+ // this.addMessage(str, 'bot');
+ // lastBotNotFinish = false;
+ // str = "";
+ // } else if (message.userType === 'bot' &&
!message.isEnd) {
+ // str += message.msg;
+ // lastBotNotFinish = true;
+ // }
+ // });
+ // }
+ this.setLoading(false);
+ showToast("重新连接")
+ try {
+ await this.streamResubscribeResponse("xx", userId, sessionId);
+ } catch (error) {
+ console.error('Error:', error);
+ } finally {
+ this.setLoading(false);
+ }
+ }
+
+ async streamResponse(query) {
+ sendMessageError = false;
+ // 获取当前输入框的值
+ const userId = this.userIdInput.value || defaultUserId;
+ const sessionId = this.sessionIdInput.value || defaultSessionId;
+
+ this.isStreaming = true;
+ this.currentBotMessage = this.addMessage('', 'bot');
+ this.markdownBuffer = '';
+
+ try {
+ const response = await
fetch(`/stream?question=${encodeURIComponent(query)}&userId=${userId}&sessionId=${sessionId}`,
{
+ method: 'GET',
+ headers: {
+ 'Accept': 'text/event-stream',
+ 'Cache-Control': 'no-cache'
+ }
+ });
+
+ if (!response.ok) {
+ throw new Error(`HTTP error! status: ${response.status}`);
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = '';
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+
+ // 按SSE协议处理:查找完整的事件
+ let eventEndIndex = buffer.indexOf('\n\n');
+ while (eventEndIndex !== -1) {
+ const eventData = buffer.substring(0, eventEndIndex);
+ buffer = buffer.substring(eventEndIndex + 2);
+ // 处理SSE事件
+ this.processSSEEvent(eventData, false);
+ eventEndIndex = buffer.indexOf('\n\n');
+ }
+ }
+ console.info("buffer.trim()" + buffer.trim())
+ // 处理最后剩余的数据
+ if (buffer.trim()) {
+ this.processSSEEvent(buffer, true);
+ } else {
+ history.addMessage('bot', "", true);
+ }
+ } catch (error) {
+ console.error('Streaming error:', error);
+ sendMessageError = true;
+ } finally {
+ this.isStreaming = false;
+ if (!sendMessageError) {
+ this.currentBotMessage = null;
+ this.markdownBuffer = '';
+ }
+
+ }
+ }
+
+ async streamResubscribeResponse(query, userId, sessionId) {
+ this.isStreaming = true;
+ if (this.currentBotMessage === null) {
+ this.currentBotMessage = this.addMessage('', 'bot');
+ }
+ try {
+ const response = await
fetch(`/resubscribeStream?question=${encodeURIComponent(query)}&userId=${userId}&sessionId=${sessionId}`,
{
+ method: 'GET',
+ headers: {
+ 'Accept': 'text/event-stream',
+ 'Cache-Control': 'no-cache'
+ }
+ });
+
+ if (!response.ok) {
+ throw new Error(`HTTP error! status: ${response.status}`);
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = '';
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+
+ // 按SSE协议处理:查找完整的事件
+ let eventEndIndex = buffer.indexOf('\n\n');
+ while (eventEndIndex !== -1) {
+ const eventData = buffer.substring(0, eventEndIndex);
+ buffer = buffer.substring(eventEndIndex + 2);
+ // 处理SSE事件
+ this.processSSEEvent(eventData);
+ eventEndIndex = buffer.indexOf('\n\n');
+ }
+ }
+ // 处理最后剩余的数据
+ if (buffer.trim()) {
+ this.processSSEEvent(buffer);
+ }
+ } catch (error) {
+ console.error('Streaming error:', error);
+ // this.currentBotMessage.innerHTML = '抱歉,连接出现问题,请稍后重试。';
+ } finally {
+ this.isStreaming = false;
+ this.currentBotMessage = null;
+ this.markdownBuffer = '';
+ }
+ }
+
+ processSSEEvent(eventData, isEnd) {
+ const lines = eventData.split('\n');
+ let data = '';
+
+ if (eventData.startsWith('data:')) {
+ data = eventData.replaceAll('data:', '');
+ } else {
+ data = eventData;
+ }
+
+ if (data) {
+ history.addMessage('bot', data, isEnd);
+ // 直接添加数据,不强制添加换行符
+ this.markdownBuffer += data;
+ this.updateMarkdownContent();
+ }
+ }
+
+ updateMarkdownContent() {
+ if (this.currentBotMessage) {
+ try {
+ // 预处理内容,确保换行符被正确处理
+ let processedContent = this.markdownBuffer;
+
+ // 确保内容以换行符结尾,这样Markdown解析器能正确处理
+ if (processedContent && !processedContent.endsWith('\n')) {
+ processedContent += '\n';
+ }
+ // 渲染Markdown内容
+ const htmlContent = marked.parse(processedContent);
+ this.currentBotMessage.innerHTML = htmlContent;
+ this.scrollToBottom();
+ } catch (error) {
+ console.error('Markdown parsing error:', error);
+ // 如果Markdown解析失败,回退到纯文本
+ this.currentBotMessage.textContent = this.markdownBuffer;
+ this.scrollToBottom();
+ }
+ }
+ }
+
+ addMessage(content, type) {
+ const messageDiv = document.createElement('div');
+ messageDiv.className = `message ${type}-message`;
+
+ if (type === 'bot' && content) {
+ // 对于机器人消息,尝试渲染Markdown
+ try {
+ // 预处理内容,确保换行符被正确处理
+ let processedContent = content;
+ if (processedContent && !processedContent.endsWith('\n')) {
+ processedContent += '\n';
+ }
+ const htmlContent = marked.parse(processedContent);
+ messageDiv.innerHTML = htmlContent;
+ } catch (error) {
+ console.error('Markdown parsing error:', error);
+ messageDiv.textContent = content;
+ }
+ } else {
+ messageDiv.textContent = content;
+ }
+ this.chatMessages.appendChild(messageDiv);
+ this.scrollToBottom();
+ return messageDiv;
+ }
+
+ setLoading(loading) {
+ this.sendButton.disabled = loading;
+ this.typingIndicator.style.display = loading ? 'block' : 'none';
+ if (loading) {
+ this.scrollToBottom();
+ }
+ }
+
+ clearChat() {
+ history.clear();
+ this.chatMessages.innerHTML = `
+ <div class="message bot-message">
+ 您好!有什么可以帮助您的吗?擅长处理天气问题与行程安排规划问题,在本例中使用RocketMQ
LiteTopic版本实现多个Agent之间的通讯<br>
+ 询问天气: '杭州明天的天气情况怎么样'<br>
+ 帮忙安排行程: '帮我做一个明天杭州周边自驾游方案'
+ </div>
+ `;
+ }
+
+ handleConfirm() {
+ const userId = this.userIdInput.value.trim();
+ const sessionId = this.sessionIdInput.value.trim();
+
+ // 验证输入
+ if (!userId || !sessionId) {
+ showToast("请填写 UserId 和 SessionId")
+ return;
+ }
+ // 禁用输入框
+ this.userIdInput.disabled = true;
+ this.sessionIdInput.disabled = true;
+ this.confirmButton.disabled = true;
+ // 发起 resubscribeMessage 请求
+ showToast("登录成功")
+ }
+
+ scrollToBottom() {
+ this.chatMessages.scrollTop = this.chatMessages.scrollHeight;
+ }
+
+ async closeStream() {
+ try {
+ const userId = this.userIdInput.value || defaultUserId;
+ const sessionId = this.sessionIdInput.value ||
defaultSessionId;
+ const response = await
fetch(`/closeStream?userId=${encodeURIComponent(userId)}&sessionId=${encodeURIComponent(sessionId)}`);
+
+ if (!response.ok) {
+ throw new Error(`关闭流失败: ${response.status}`);
+ }
+
+ const result = await response.text(); // 你的后端返回的是字符串
+ console.log('流关闭结果:', result);
+ showToast("断开连接")
+ return result;
+
+ } catch (error) {
+ console.error('关闭流时出错:', error);
+ throw error;
+ }
+ }
+ }
+
+ // 初始化聊天机器人
+ document.addEventListener('DOMContentLoaded', () => {
+ new ChatBot();
+ });
+
+ function showToast(message, type = 'default', duration = 3000) {
+ const container = document.getElementById('toastContainer');
+ if (!container) return;
+
+ const toast = document.createElement('div');
+ toast.className = `toast ${type !== 'default' ? type : ''}`;
+ toast.textContent = message;
+ container.appendChild(toast);
+
+ setTimeout(() => {
+ if (toast.parentNode) toast.remove();
+ }, duration);
+ }
+ window.showToast = showToast;
+
+</script>
+</body>
+</html>
diff --git
a/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentExecutorProducer.java
b/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentExecutorProducer.java
index 9a7b6c9..88479d7 100644
---
a/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentExecutorProducer.java
+++
b/example/rocketmq-multiagent-base-adk/TravelAgent/src/main/java/agent/AgentExecutorProducer.java
@@ -16,8 +16,6 @@
*/
package agent;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -39,6 +37,7 @@ import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TextPart;
+import io.reactivex.Flowable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.commons.lang3.StringUtils;
@@ -62,13 +61,15 @@ public class AgentExecutorProducer {
}
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
try {
- String response = appCall(userMessage);
- List<String> chunks = splitStringIntoChunks(response, 100);
- for (String chunk : chunks) {
- List<Part<?>> parts = List.of(new TextPart(chunk,
null));
- Thread.sleep(500);
- System.out.println("update artifact!!");
- taskUpdater.addArtifact(parts);
+ Flowable<ApplicationResult> applicationResultFlowable =
appCallStream(userMessage);
+ String lastOutput = "";
+ for (ApplicationResult msg :
applicationResultFlowable.blockingIterable()) {
+ String currentText = msg.getOutput().getText();
+ if (currentText.length() > lastOutput.length()) {
+ List<Part<?>> parts = List.of(new
TextPart(currentText.substring(lastOutput.length()), null));
+ taskUpdater.addArtifact(parts);
+ }
+ lastOutput = currentText;
}
taskUpdater.complete();
} catch (Exception e) {
@@ -122,25 +123,20 @@ public class AgentExecutorProducer {
return result.getOutput().getText();
}
+ public static Flowable<ApplicationResult> appCallStream(String prompt)
throws ApiException, NoApiKeyException, InputRequiredException {
+ ApplicationParam param = ApplicationParam.builder()
+ .apiKey(ApiKey)
+ .appId(AppId)
+ .prompt(prompt)
+ .build();
+ Application application = new Application();
+ Flowable<ApplicationResult> applicationResultFlowable =
application.streamCall(param);
+ return applicationResultFlowable;
+ }
+
private Task createTask(io.a2a.spec.Message request) {
String id = !StringUtils.isEmpty(request.getTaskId()) ?
request.getTaskId() : UUID.randomUUID().toString();
String contextId = !StringUtils.isEmpty(request.getContextId()) ?
request.getContextId() : UUID.randomUUID().toString();
return new Task(id, contextId, new TaskStatus(TaskState.SUBMITTED),
null, List.of(request), null);
}
-
- public static List<String> splitStringIntoChunks(String input, int
maxLength) {
- if (maxLength <= 0) {
- throw new IllegalArgumentException("maxLength must be positive");
- }
- if (StringUtils.isEmpty(input)) {
- return Collections.emptyList();
- }
- List<String> chunks = new ArrayList<>();
- int length = input.length();
- for (int i = 0; i < length; i += maxLength) {
- int end = Math.min(i + maxLength, length);
- chunks.add(input.substring(i, end));
- }
- return chunks;
- }
}
diff --git
a/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentExecutorProducer.java
b/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentExecutorProducer.java
index 3582b4e..b8b9b84 100644
---
a/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentExecutorProducer.java
+++
b/example/rocketmq-multiagent-base-adk/WeatherAgent/src/main/java/agent/AgentExecutorProducer.java
@@ -16,8 +16,6 @@
*/
package agent;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -39,6 +37,7 @@ import io.a2a.spec.TaskNotCancelableError;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TextPart;
+import io.reactivex.Flowable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.commons.lang3.StringUtils;
@@ -62,13 +61,15 @@ public class AgentExecutorProducer {
}
TaskUpdater taskUpdater = new TaskUpdater(context, eventQueue);
try {
- String response = appCall(userMessage);
- List<String> chunks = splitStringIntoChunks(response, 100);
- for (String chunk : chunks) {
- List<Part<?>> parts = List.of(new TextPart(chunk,
null));
- Thread.sleep(500);
- System.out.println("update artifact!!");
- taskUpdater.addArtifact(parts);
+ Flowable<ApplicationResult> applicationResultFlowable =
appCallStream(userMessage);
+ String lastOutput = "";
+ for (ApplicationResult msg :
applicationResultFlowable.blockingIterable()) {
+ String currentText = msg.getOutput().getText();
+ if (currentText.length() > lastOutput.length()) {
+ List<Part<?>> parts = List.of(new
TextPart(currentText.substring(lastOutput.length()), null));
+ taskUpdater.addArtifact(parts);
+ }
+ lastOutput = currentText;
}
taskUpdater.complete();
} catch (Exception e) {
@@ -128,20 +129,15 @@ public class AgentExecutorProducer {
return new Task(id, contextId, new TaskStatus(TaskState.SUBMITTED),
null, List.of(request), null);
}
- public static List<String> splitStringIntoChunks(String input, int
maxLength) {
- if (maxLength <= 0) {
- throw new IllegalArgumentException("maxLength must be positive");
- }
- if (StringUtils.isEmpty(input)) {
- return Collections.emptyList();
- }
- List<String> chunks = new ArrayList<>();
- int length = input.length();
- for (int i = 0; i < length; i += maxLength) {
- int end = Math.min(i + maxLength, length);
- chunks.add(input.substring(i, end));
- }
- return chunks;
+ public static Flowable<ApplicationResult> appCallStream(String prompt)
throws ApiException, NoApiKeyException, InputRequiredException {
+ ApplicationParam param = ApplicationParam.builder()
+ .apiKey(ApiKey)
+ .appId(AppId)
+ .prompt(prompt)
+ .build();
+ Application application = new Application();
+ Flowable<ApplicationResult> applicationResultFlowable =
application.streamCall(param);
+ return applicationResultFlowable;
}
}
diff --git a/example/rocketmq-multiagent-base-adk/img_2.png
b/example/rocketmq-multiagent-base-adk/img_2.png
new file mode 100644
index 0000000..125cd1b
Binary files /dev/null and b/example/rocketmq-multiagent-base-adk/img_2.png
differ
diff --git a/example/rocketmq-multiagent-base-adk/img_3.png
b/example/rocketmq-multiagent-base-adk/img_3.png
new file mode 100644
index 0000000..d6c13ce
Binary files /dev/null and b/example/rocketmq-multiagent-base-adk/img_3.png
differ
diff --git a/example/rocketmq-multiagent-base-adk/img_4.png
b/example/rocketmq-multiagent-base-adk/img_4.png
new file mode 100644
index 0000000..d54ac4f
Binary files /dev/null and b/example/rocketmq-multiagent-base-adk/img_4.png
differ
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml
b/example/rocketmq-multiagent-base-adk/pom.xml
index d72aa17..cb9ef63 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -29,6 +29,7 @@
<module>WeatherAgent</module>
<module>Common</module>
<module>TravelAgent</module>
+ <module>SupervisorAgent-Web</module>
</modules>
<properties>