This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git
The following commit(s) were added to refs/heads/main by this push:
new b21f1cb Optimize the RocketMQTransport (#14)
b21f1cb is described below
commit b21f1cb43bf6578c8350e901464fd9f4f7716305
Author: Drizzle <[email protected]>
AuthorDate: Fri Dec 26 17:17:06 2025 +0800
Optimize the RocketMQTransport (#14)
* update
Change-Id: Icb48afcf44893ad3c07fc0e243e333096d8e3ec0
* optimize the code
Change-Id: Iaa158a2bd2bc55475e7c55e7ae3392b4e2f28fa0
* optimize the code
Change-Id: I9fba77a503e387b5d9feab13cd144021b938ec1f
* update
Change-Id: Ia842ff8df57e018a36c672c9ddc7d1c7f42240f3
* update
Change-Id: Ibfe44c7be6f7a25be6ba74644ac0e9bee9a8ce43
* update
Change-Id: I6ba98dca2dc8001494aec4ddbe4e8bea38f8250a
* update
Change-Id: Ib0f380ba1f073608842c4ad467757368e331bdeb
* update
Change-Id: I3034540314cc832d68a1095fdb5a1366f2b26864
* update
Change-Id: Ie22cf3da240e10ae70800f8c1443134c70c9215f
* optimize the code
Change-Id: I0bc42ef1f8813d4ed538810c27c8e468cda90309
* update
Change-Id: I31f0bfda2a217979be592c533065268f03418951
* update
Change-Id: If3147fde462620d5fa71a71c86a574babcc23589
* update
Change-Id: I5b711a35249a0525a749af7e92bbe4547f9eb910
* update
Change-Id: I6cd9b87e7aeeb85137ba324f79f17b386cf2bef8
* update
Change-Id: I5134715702d491ce02a900e9ca36bc214c20141e
* update
Change-Id: I8031f2fbe0dbd69ec40f23b7a7d6dab3ef53e047
* update
Change-Id: Ic418be82f1e52191d5c2afd7307c64f9962fe943
---------
Co-authored-by: drizzle.zk <[email protected]>
---
.../SupervisorAgent-Web/pom.xml | 2 +-
example/rocketmq-multiagent-base-adk/pom.xml | 2 +-
pom.xml | 8 +-
.../rocketmq/a2a/common/RocketMQA2AConstant.java | 16 +
.../rocketmq/a2a/common/RocketMQResourceInfo.java | 113 ++++
.../apache/rocketmq/a2a/common/RocketMQUtil.java | 352 +++++++++++++
.../a2a/server/RocketMQA2AServerRoutes.java | 285 ++++------
.../rocketmq/a2a/transport/RocketMQTransport.java | 577 ++-------------------
8 files changed, 658 insertions(+), 697 deletions(-)
diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
index 374381c..2869d06 100644
--- a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -119,7 +119,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.7</version>
+ <version>1.0.8</version>
</dependency>
</dependencies>
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml
b/example/rocketmq-multiagent-base-adk/pom.xml
index a92d43f..850a237 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -81,7 +81,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.7</version>
+ <version>1.0.8</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
diff --git a/pom.xml b/pom.xml
index 37b4301..0fea3a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-a2a</artifactId>
- <version>1.0.8-SNAPSHOT</version>
+ <version>1.0.8</version>
<name>Apache RocketMQ A2A ${project.version}</name>
<description>Integrate Apache RocketMQ with A2A</description>
@@ -141,6 +141,12 @@
<artifactId>slf4j-api</artifactId>
<version>${slfj-version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-a2a</artifactId>
+ <version>3.0.1-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
index fd428f0..a55c453 100644
--- a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQA2AConstant.java
@@ -16,7 +16,23 @@
*/
package org.apache.rocketmq.a2a.common;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.a2a.spec.CancelTaskResponse;
+import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
+import io.a2a.spec.GetTaskPushNotificationConfigResponse;
+import io.a2a.spec.GetTaskResponse;
+import io.a2a.spec.ListTaskPushNotificationConfigResponse;
+import io.a2a.spec.SendMessageResponse;
+import io.a2a.spec.SetTaskPushNotificationConfigResponse;
+
public class RocketMQA2AConstant {
+ public static final TypeReference<SendMessageResponse>
SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() { };
+ public static final TypeReference<GetTaskResponse>
GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
+ public static final TypeReference<CancelTaskResponse>
CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
+ public static final TypeReference<GetTaskPushNotificationConfigResponse>
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
+ public static final TypeReference<SetTaskPushNotificationConfigResponse>
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
+ public static final TypeReference<ListTaskPushNotificationConfigResponse>
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
+ public static final TypeReference<GetAuthenticatedExtendedCardResponse>
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE = new TypeReference<>() { };
public static final String HTTP_URL_PREFIX = "http://";
public static final String HTTPS_URL_PREFIX = "https://";
public static final String ROCKETMQ_PROTOCOL = "RocketMQ";
diff --git
a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java
new file mode 100644
index 0000000..0ad46cb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQResourceInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.a2a.common;
+
+import java.util.List;
+
+import com.alibaba.fastjson.JSON;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.AgentInterface;
+import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTPS_URL_PREFIX;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTP_URL_PREFIX;
+
+public class RocketMQResourceInfo {
+ private static final Logger log =
LoggerFactory.getLogger(RocketMQResourceInfo.class);
+ private String namespace;
+ private String endpoint;
+ private String topic;
+
+ public RocketMQResourceInfo(String endpoint, String topic) {
+ this.endpoint = endpoint;
+ this.topic = topic;
+ }
+
+ public RocketMQResourceInfo() {}
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public static RocketMQResourceInfo parseAgentCardAddition(AgentCard
agentCard) {
+ if (null == agentCard ||
StringUtils.isEmpty(agentCard.preferredTransport()) ||
StringUtils.isEmpty(agentCard.url()) || null ==
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty())
{
+ log.error("parseAgentCardAddition param error, agentCard: {}",
JSON.toJSONString(agentCard));
+ return null;
+ }
+ RocketMQResourceInfo rocketMQResourceInfo = null;
+ String preferredTransport = agentCard.preferredTransport();
+ if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
+ String url = agentCard.url();
+ rocketMQResourceInfo = pareAgentCardUrl(url);
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+ log.info("RocketMQTransport get rocketMQResourceInfo from
preferredTransport");
+ return rocketMQResourceInfo;
+ }
+ }
+ List<AgentInterface> agentInterfaces =
agentCard.additionalInterfaces();
+ for (AgentInterface agentInterface : agentInterfaces) {
+ String transport = agentInterface.transport();
+ if (!StringUtils.isEmpty(transport) &&
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
+ String url = agentInterface.url();
+ rocketMQResourceInfo = pareAgentCardUrl(url);
+ if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+ log.error("RocketMQTransport get rocketMQResourceInfo from
additionalInterfaces");
+ return rocketMQResourceInfo;
+ }
+ }
+ }
+ return null;
+ }
+
+ public static RocketMQResourceInfo pareAgentCardUrl(String agentCardUrl) {
+ if (StringUtils.isEmpty(agentCardUrl)) {
+ return null;
+ }
+ String agentUrl = agentCardUrl.replace(HTTP_URL_PREFIX, "");
+ String replaceFinal = agentUrl.replace(HTTPS_URL_PREFIX, "");
+ String[] split = replaceFinal.split("/");
+ if (split.length != 3) {
+ return null;
+ }
+ RocketMQResourceInfo rocketMQResourceInfo = new RocketMQResourceInfo();
+ rocketMQResourceInfo.setEndpoint(split[0].trim());
+ rocketMQResourceInfo.setNamespace(split[1].trim());
+ rocketMQResourceInfo.setTopic(split[2].trim());
+ return rocketMQResourceInfo;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java
b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java
new file mode 100644
index 0000000..3d469dc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/a2a/common/RocketMQUtil.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.a2a.common;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.a2a.client.transport.jsonrpc.sse.SSEEventListener;
+import io.a2a.client.transport.spi.interceptors.PayloadAndHeaders;
+import io.a2a.spec.A2AClientException;
+import io.a2a.spec.JSONRPCError;
+import io.a2a.spec.JSONRPCResponse;
+import io.a2a.util.Utils;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static io.a2a.util.Utils.OBJECT_MAPPER;
+import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.DATA_PREFIX;
+
+public class RocketMQUtil {
+ private static final Logger log =
LoggerFactory.getLogger(RocketMQUtil.class);
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new
ConcurrentHashMap<>();
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new
ConcurrentHashMap<>();
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new
ConcurrentHashMap<>();
+ public static final ConcurrentMap<String /* namespace */, Map<String /*
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
+
+ public static void checkConfigParam(String endpoint, String
workAgentResponseTopic, String workAgentResponseGroupID, String liteTopic,
String agentTopic) {
+ if (StringUtils.isEmpty(endpoint) ||
StringUtils.isEmpty(workAgentResponseTopic) ||
StringUtils.isEmpty(workAgentResponseGroupID) || StringUtils.isEmpty(liteTopic)
|| StringUtils.isEmpty(agentTopic)) {
+ if (StringUtils.isEmpty(endpoint)) {
+ log.error("checkRocketMQConfigParam endpoint is empty");
+ }
+ if (StringUtils.isEmpty(workAgentResponseTopic)) {
+ log.error("checkRocketMQConfigParam workAgentResponseTopic is
empty");
+ }
+ if (StringUtils.isEmpty(workAgentResponseGroupID)) {
+ log.error("checkRocketMQConfigParam workAgentResponseGroupID
is empty");
+ }
+ if (StringUtils.isEmpty(liteTopic)) {
+ log.error("checkRocketMQConfigParam liteTopic is empty");
+ }
+ if (StringUtils.isEmpty(agentTopic)) {
+ log.error("checkRocketMQConfigParam agentTopic is empty");
+ }
+ throw new RuntimeException("checkRocketMQConfigParam error, init
failed !!!");
+ }
+ }
+
+ public static Producer initAndGetProducer(String namespace, String
endpoint, String accessKey, String secretKey, String agentTopic) throws
ClientException {
+ if (null == namespace || StringUtils.isEmpty(endpoint) ||
StringUtils.isEmpty(agentTopic)) {
+ log.error("initAndGetProducer param error, namespace: {},
endpoint: {}, agentTopic: {}", namespace, endpoint, agentTopic);
+ }
+ Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+ return producerMap.computeIfAbsent(agentTopic, k -> {
+ try {
+ return buildProducer(namespace, endpoint, accessKey,
secretKey, k);
+ } catch (ClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public static Producer buildProducer(String namespace, String endpoint,
String accessKey, String secretKey, String... topics) throws ClientException {
+ if (null == namespace || StringUtils.isEmpty(endpoint)) {
+ log.error("buildProducer param error, endpoint: {}", endpoint);
+ return null;
+ }
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoint)
+ .setNamespace(namespace)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .setRequestTimeout(Duration.ofSeconds(15))
+ .build();
+ final ProducerBuilder builder = provider.newProducerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setTopics(topics);
+ return builder.build();
+ }
+
+ public static LitePushConsumer initAndGetConsumer(String namespace, String
endpoint, String accessKey, String secretKey, String workAgentResponseTopic,
String workAgentResponseGroupID, String liteTopic) throws ClientException {
+ if (null == namespace || StringUtils.isEmpty(endpoint) ||
StringUtils.isEmpty(workAgentResponseTopic) ||
StringUtils.isEmpty(workAgentResponseGroupID) ||
StringUtils.isEmpty(liteTopic)) {
+ log.error("initAndGetConsumer param error, namespace: {},
endpoint: {}, workAgentResponseTopic: {}, " + "workAgentResponseGroupID: {},
liteTopic: {}", namespace, endpoint, workAgentResponseTopic,
workAgentResponseGroupID, liteTopic);
+ return null;
+ }
+ Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+ LitePushConsumer litePushConsumer =
consumerMap.computeIfAbsent(workAgentResponseTopic, k -> {
+ try {
+ return buildConsumer(endpoint, namespace, accessKey,
secretKey, workAgentResponseGroupID, workAgentResponseTopic);
+ } catch (ClientException e) {
+ log.error("RocketMQTransport initRocketMQProducerAndConsumer
buildConsumer error: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ if (null != litePushConsumer) {
+ litePushConsumer.subscribeLite(liteTopic);
+ }
+ return litePushConsumer;
+ }
+
+ //todo
+ public static LitePushConsumer buildConsumer(String endpoint, String
namespace, String accessKey, String secretKey, String workAgentResponseGroupID,
String workAgentResponseTopic) throws ClientException {
+ if (StringUtils.isEmpty(endpoint) ||
StringUtils.isEmpty(workAgentResponseGroupID) ||
StringUtils.isEmpty(workAgentResponseTopic)) {
+ log.error("RocketMQTransport buildConsumer check param error");
+ return null;
+ }
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoint)
+ .setNamespace(namespace)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ return provider.newLitePushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(workAgentResponseGroupID)
+ .bindTopic(workAgentResponseTopic)
+ .setMessageListener(messageView -> {
+ try {
+ Optional<String> liteTopicOpt = messageView.getLiteTopic();
+ String liteTopic = liteTopicOpt.get();
+ if (StringUtils.isEmpty(liteTopic)) {
+ log.error("RocketMQTransport buildConsumer liteTopic
is empty");
+ return ConsumeResult.SUCCESS;
+ }
+ byte[] result = new
byte[messageView.getBody().remaining()];
+ messageView.getBody().get(result);
+ String resultStr = new String(result,
StandardCharsets.UTF_8);
+ RocketMQResponse response = JSON.parseObject(resultStr,
RocketMQResponse.class);
+ if (null == response ||
StringUtils.isEmpty(response.getMessageId())) {
+ log.error("RocketMQTransport litePushConsumer consumer
error, response is null or messageId is empty");
+ return ConsumeResult.SUCCESS;
+ }
+ if (!response.isStream()) {
+ return dealNonStreamResult(response, namespace);
+ }
+ return dealStreamResult(response, namespace, liteTopic);
+ } catch (Exception e) {
+ log.error("RocketMQTransport litePushConsumer consumer
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
+ return ConsumeResult.SUCCESS;
+ }
+ }).build();
+ }
+
+ public static PushConsumer buildConsumer(String endpoint, String
namespace, String accessKey, String secretKey, String bizGroup, String
bizTopic, MessageListener messageListener) throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(endpoint)
+ .setNamespace(namespace)
+ .setCredentialProvider(sessionCredentialsProvider)
+ .build();
+ String tag = "*";
+ return provider.newPushConsumerBuilder()
+ .setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(bizGroup)
+ .setSubscriptionExpressions(Collections.singletonMap(bizTopic, new
FilterExpression(tag, FilterExpressionType.TAG)))
+ .setMessageListener(messageListener).build();
+ }
+
+ public static Message buildMessage(String topic, String liteTopic,
RocketMQResponse response) {
+ if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
+ log.error("RocketMQA2AServerRoutes buildMessage param error,
topic: {}, liteTopic: {}, response: {}", topic, liteTopic,
JSON.toJSONString(response));
+ return null;
+ }
+ String missionJsonStr = JSON.toJSONString(response);
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ return provider.newMessageBuilder()
+ .setTopic(topic)
+ .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8))
+ .setLiteTopic(liteTopic)
+ .build();
+ }
+
+ public static String sendRocketMQRequest(PayloadAndHeaders
payloadAndHeaders, String agentTopic, String liteTopic, String
workAgentResponseTopic, Producer producer) throws JsonProcessingException {
+ if (null == payloadAndHeaders || StringUtils.isEmpty(agentTopic) ||
StringUtils.isEmpty(liteTopic) || StringUtils.isEmpty(workAgentResponseTopic)
|| null == producer) {
+ log.error("RocketMQTransport sendRocketMQRequest param error,
payloadAndHeaders: {}, agentTopic: {}, workAgentResponseTopic: {}, liteTopic:
{}, producer: {}", payloadAndHeaders, agentTopic, workAgentResponseTopic,
liteTopic, producer);
+ return null;
+ }
+ RocketMQRequest request = new RocketMQRequest();
+
request.setRequestBody(Utils.OBJECT_MAPPER.writeValueAsString(payloadAndHeaders.getPayload()));
+ request.setAgentTopic(agentTopic);
+ request.setWorkAgentResponseTopic(workAgentResponseTopic);
+ request.setLiteTopic(liteTopic);
+ if (payloadAndHeaders.getHeaders() != null) {
+ for (Map.Entry<String, String> entry :
payloadAndHeaders.getHeaders().entrySet()) {
+ request.addHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ String messageBodyStr = serialText(request);
+ if (StringUtils.isEmpty(messageBodyStr)) {
+ return null;
+ }
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ byte[] body = messageBodyStr.getBytes(StandardCharsets.UTF_8);
+ final Message message =
provider.newMessageBuilder().setTopic(agentTopic).setBody(body).build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ if (!StringUtils.isEmpty(sendReceipt.getMessageId().toString())) {
+ return sendReceipt.getMessageId().toString();
+ }
+ } catch (Throwable t) {
+ log.error("sendRocketMQRequest send message failed, error: {}",
t.getMessage());
+ }
+ return null;
+ }
+
+ private static ConsumeResult dealStreamResult(RocketMQResponse response,
String namespace, String liteTopic) {
+ if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() &&
StringUtils.isEmpty(response.getResponseBody())) {
+ log.error("RocketMQTransport dealStreamResult param is error,
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
+ return ConsumeResult.SUCCESS;
+ }
+ Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
+ if (null == sseEventListenerMap) {
+ return ConsumeResult.SUCCESS;
+ }
+ SSEEventListener sseEventListener =
sseEventListenerMap.get(response.getMessageId());
+ if (null == sseEventListener) {
+ Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
+ if (null == booleanMap ||
!Boolean.TRUE.equals(booleanMap.get(liteTopic))) {
+ return ConsumeResult.SUCCESS;
+ }
+ if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
+ Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
+ if (null == sseEventListenerMapRecover) {
+ return ConsumeResult.SUCCESS;
+ }
+ sseEventListener =
sseEventListenerMapRecover.get(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER);
+ if (null == sseEventListener) {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ if (null == sseEventListener) {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ String item = response.getResponseBody();
+ if (!StringUtils.isEmpty(item) && item.startsWith(DATA_PREFIX)) {
+ item = item.substring(5).trim();
+ if (!item.isEmpty()) {
+ try {
+ sseEventListener.onMessage(item, new
CompletableFuture<>());
+ } catch (Throwable e) {
+ log.error("RocketMQTransport dealStreamResult error: {}",
e.getMessage());
+ return ConsumeResult.FAILURE;
+ }
+ }
+ if (response.isEnd() &&
!StringUtils.isEmpty(response.getMessageId())) {
+ sseEventListenerMap.remove(response.getMessageId());
+ }
+ }
+ return ConsumeResult.SUCCESS;
+ }
+
+ private static ConsumeResult dealNonStreamResult(RocketMQResponse
response, String namespace) {
+ if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(response.getResponseBody())) {
+ log.error("RocketMQTransport dealNonStreamResult param is error,
response: {}", JSON.toJSONString(response));
+ return ConsumeResult.SUCCESS;
+ }
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(namespace);
+ if (null != completableFutureMap &&
completableFutureMap.containsKey(response.getMessageId())) {
+ CompletableFuture<String> completableFuture =
completableFutureMap.get(response.getMessageId());
+ completableFuture.complete(response.getResponseBody());
+ }
+ return ConsumeResult.SUCCESS;
+ }
+
+ public static String getResult(String responseMessageId, String namespace)
throws ExecutionException, InterruptedException, TimeoutException {
+ if (StringUtils.isEmpty(responseMessageId)) {
+ throw new RuntimeException("responseMessageId is null");
+ }
+ Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(namespace, k -> new HashMap<>());
+ CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
+ completableFutureMap.put(responseMessageId, objectCompletableFuture);
+ String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
+ completableFutureMap.remove(responseMessageId);
+ return result;
+ }
+
+ public static <T extends JSONRPCResponse<?>> T unmarshalResponse(String
response, TypeReference<T> typeReference)
+ throws A2AClientException, JsonProcessingException {
+ T value = Utils.unmarshalFrom(response, typeReference);
+ JSONRPCError error = value.getError();
+ if (error != null) {
+ throw new A2AClientException(error.getMessage() + (error.getData()
!= null ? ": " + error.getData() : ""), error);
+ }
+ return value;
+ }
+
+ public static String toJsonString(Object o) {
+ if (null == o) {
+ log.error("toJsonString param is null");
+ return null;
+ }
+ try {
+ return OBJECT_MAPPER.writeValueAsString(o);
+ } catch (JsonProcessingException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static String serialText(RocketMQRequest rocketMQRequest) {
+ if (null == rocketMQRequest ||
StringUtils.isEmpty(rocketMQRequest.getRequestBody()) ||
StringUtils.isEmpty(rocketMQRequest.getWorkAgentResponseTopic()) ||
StringUtils.isEmpty(rocketMQRequest.getLiteTopic()) ||
StringUtils.isEmpty(rocketMQRequest.getAgentTopic())) {
+ log.error("serialText param error rocketMQRequest: {}",
JSON.toJSONString(rocketMQRequest));
+ return null;
+ }
+ return JSON.toJSONString(rocketMQRequest);
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
index 321bc68..976261c 100644
--- a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
+++ b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.a2a.server;
import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
@@ -27,7 +25,9 @@ import
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+
import com.alibaba.fastjson.JSON;
+
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.io.JsonEOFException;
@@ -70,24 +70,22 @@ import io.vertx.ext.web.RoutingContext;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.FilterExpression;
-import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import static io.a2a.util.Utils.OBJECT_MAPPER;
import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.METHOD;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildConsumer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildMessage;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.buildProducer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.toJsonString;
@Startup
@Singleton
@@ -99,159 +97,115 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
private static final String BIZ_CONSUMER_GROUP =
System.getProperty("bizConsumerGroup", "");
private static final String ACCESS_KEY = System.getProperty("rocketMQAK",
"");
private static final String SECRET_KEY = System.getProperty("rocketMQSK",
"");
-
- @Inject
- JSONRPCHandler jsonRpcHandler;
-
private static volatile Runnable
streamingMultiSseSupportSubscribedRunnable;
-
- private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 6,
- 6,
- 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(10_0000),
- new CallerRunsPolicy()
- );
-
+ private final ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6,
60, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(10_0000), new CallerRunsPolicy());
private Producer producer;
private PushConsumer pushConsumer;
private MultiSseSupport multiSseSupport;
+ @Inject
+ JSONRPCHandler jsonRpcHandler;
+
@PostConstruct
public void init() {
try {
checkConfigParam();
- this.producer = buildProducer();
- this.pushConsumer = buildConsumer();
+ this.producer = buildProducer(ROCKETMQ_NAMESPACE,
ROCKETMQ_ENDPOINT, ACCESS_KEY, SECRET_KEY);
+ this.pushConsumer = buildConsumer(ROCKETMQ_ENDPOINT,
ROCKETMQ_NAMESPACE, ACCESS_KEY, SECRET_KEY,
+ BIZ_CONSUMER_GROUP, BIZ_TOPIC, buildMessageListener());
this.multiSseSupport = new MultiSseSupport(this.producer);
log.info("RocketMQA2AServerRoutes init success");
} catch (Exception e) {
log.error("RocketMQA2AServerRoutes error: {}", e.getMessage());
}
}
- private Producer buildProducer() throws ClientException {
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(ROCKETMQ_ENDPOINT)
- .setNamespace(ROCKETMQ_NAMESPACE)
- .setCredentialProvider(sessionCredentialsProvider)
- .setRequestTimeout(Duration.ofSeconds(15))
- .build();
- final ProducerBuilder builder =
provider.newProducerBuilder().setClientConfiguration(clientConfiguration);
- return builder.build();
- }
- private PushConsumer buildConsumer() throws ClientException {
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(ROCKETMQ_ENDPOINT)
- .setNamespace(ROCKETMQ_NAMESPACE)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
- String tag = "*";
- FilterExpression filterExpression = new FilterExpression(tag,
FilterExpressionType.TAG);
- PushConsumer consumer = provider.newPushConsumerBuilder()
- .setClientConfiguration(clientConfiguration)
- .setConsumerGroup(BIZ_CONSUMER_GROUP)
- .setSubscriptionExpressions(Collections.singletonMap(BIZ_TOPIC,
filterExpression))
- .setMessageListener(messageView -> {
- CompletableFuture<Boolean> completableFuture = null;
+ private MessageListener buildMessageListener() {
+ return messageView -> {
+ CompletableFuture<Boolean> completableFuture = null;
+ try {
+ byte[] result = new byte[messageView.getBody().remaining()];
+ messageView.getBody().get(result);
+ String messageStr = new String(result, StandardCharsets.UTF_8);
+ RocketMQRequest request = JSON.parseObject(messageStr,
RocketMQRequest.class);
+ boolean streaming = false;
+ String body = request.getRequestBody();
+ JSONRPCResponse<?> nonStreamingResponse = null;
+ Multi<? extends JSONRPCResponse<?>> streamingResponse = null;
+ JSONRPCErrorResponse error = null;
try {
- byte[] result = new
byte[messageView.getBody().remaining()];
- messageView.getBody().get(result);
- String messageStr = new String(result,
StandardCharsets.UTF_8);
- RocketMQRequest request = JSON.parseObject(messageStr,
RocketMQRequest.class);
- boolean streaming = false;
- String body = request.getRequestBody();
- JSONRPCResponse<?> nonStreamingResponse = null;
- Multi<? extends JSONRPCResponse<?>> streamingResponse =
null;
- JSONRPCErrorResponse error = null;
- try {
- JsonNode node = OBJECT_MAPPER.readTree(body);
- JsonNode method = node != null ? node.get(METHOD) :
null;
- streaming = method != null &&
(SendStreamingMessageRequest.METHOD.equals(method.asText()) ||
TaskResubscriptionRequest.METHOD.equals(method.asText()));
- if (streaming) {
- StreamingJSONRPCRequest<?> streamingJSONRPCRequest
= OBJECT_MAPPER.treeToValue(node, StreamingJSONRPCRequest.class);
- streamingResponse =
processStreamingRequest(streamingJSONRPCRequest, null);
- } else {
- NonStreamingJSONRPCRequest<?>
nonStreamingJSONRPCRequest = OBJECT_MAPPER.treeToValue(node,
NonStreamingJSONRPCRequest.class);
- nonStreamingResponse =
processNonStreamingRequest(nonStreamingJSONRPCRequest, null);
- }
- } catch (JsonProcessingException e) {
- error = handleError(e);
- } catch (Throwable t) {
- error = new JSONRPCErrorResponse(new
InternalError(t.getMessage()));
- } finally {
- RocketMQResponse response = null;
- if (error != null) {
- response = new RocketMQResponse();
- response.setEnd(true);
- response.setStream(false);
- response.setLiteTopic(request.getLiteTopic());
- response.setContextId(response.getContextId());
- response.setResponseBody(JSON.toJSONString(error));
-
response.setMessageId(messageView.getMessageId().toString());
- } else if (streaming) {
- final Multi<? extends JSONRPCResponse<?>>
finalStreamingResponse = streamingResponse;
- log.info("RocketMQA2AServerRoutes streaming
finalStreamingResponse: {}", JSON.toJSONString(finalStreamingResponse));
- completableFuture = new CompletableFuture<>();
- CompletableFuture<Boolean> finalCompletableFuture
= completableFuture;
- this.executor.execute(() -> {
-
this.multiSseSupport.subscribeObjectRocketmq(finalStreamingResponse.map(i ->
(Object)i), null, request.getWorkAgentResponseTopic(), request.getLiteTopic(),
messageView.getMessageId().toString(), finalCompletableFuture);
- });
- } else {
- response = new RocketMQResponse();
- response.setEnd(true);
- response.setStream(false);
- response.setLiteTopic(request.getLiteTopic());
- response.setContextId(response.getContextId());
-
response.setMessageId(messageView.getMessageId().toString());
-
response.setResponseBody(toJsonString(nonStreamingResponse));
- }
- if (null != response) {
- SendReceipt send =
this.producer.send(buildMessage(request.getWorkAgentResponseTopic(),
request.getLiteTopic(), response));
- log.info("RocketMQA2AServerRoutes send
nonStreamingResponse success, msgId: {}, time: {}, response: {}",
send.getMessageId(), System.currentTimeMillis(), JSON.toJSONString(response));
- }
+ JsonNode node = OBJECT_MAPPER.readTree(body);
+ JsonNode method = node != null ? node.get(METHOD) : null;
+ streaming = method != null &&
(SendStreamingMessageRequest.METHOD.equals(method.asText())
+ ||
TaskResubscriptionRequest.METHOD.equals(method.asText()));
+ if (streaming) {
+ StreamingJSONRPCRequest<?> streamingJSONRPCRequest =
OBJECT_MAPPER.treeToValue(node,
+ StreamingJSONRPCRequest.class);
+ streamingResponse =
processStreamingRequest(streamingJSONRPCRequest, null);
+ } else {
+ NonStreamingJSONRPCRequest<?>
nonStreamingJSONRPCRequest = OBJECT_MAPPER.treeToValue(node,
+ NonStreamingJSONRPCRequest.class);
+ nonStreamingResponse =
processNonStreamingRequest(nonStreamingJSONRPCRequest, null);
+ }
+ } catch (JsonProcessingException e) {
+ error = handleError(e);
+ } catch (Throwable t) {
+ error = new JSONRPCErrorResponse(new
InternalError(t.getMessage()));
+ } finally {
+ RocketMQResponse response = null;
+ if (error != null) {
+ response = new RocketMQResponse();
+ response.setEnd(true);
+ response.setStream(false);
+ response.setLiteTopic(request.getLiteTopic());
+ response.setResponseBody(JSON.toJSONString(error));
+
response.setMessageId(messageView.getMessageId().toString());
+ } else if (streaming) {
+ final Multi<? extends JSONRPCResponse<?>>
finalStreamingResponse = streamingResponse;
+ log.info("RocketMQA2AServerRoutes streaming
finalStreamingResponse: {}",
+ JSON.toJSONString(finalStreamingResponse));
+ completableFuture = new CompletableFuture<>();
+ CompletableFuture<Boolean> finalCompletableFuture =
completableFuture;
+ this.executor.execute(() -> {
+
this.multiSseSupport.subscribeObjectRocketmq(finalStreamingResponse.map(i ->
(Object)i),
+ null, request.getWorkAgentResponseTopic(),
request.getLiteTopic(),
+ messageView.getMessageId().toString(),
finalCompletableFuture);
+ });
+ } else {
+ response = new RocketMQResponse();
+ response.setEnd(true);
+ response.setStream(false);
+ response.setLiteTopic(request.getLiteTopic());
+
response.setMessageId(messageView.getMessageId().toString());
+
response.setResponseBody(toJsonString(nonStreamingResponse));
+ }
+ if (null != response) {
+ SendReceipt send =
this.producer.send(buildMessage(request.getWorkAgentResponseTopic(),
request.getLiteTopic(), response));
+ log.info("RocketMQA2AServerRoutes send
nonStreamingResponse success, msgId: {}, time: {}, " + "response: {}",
send.getMessageId(), System.currentTimeMillis(), JSON.toJSONString(response));
}
- } catch (Exception e) {
- log.error("RocketMQA2AServerRoutes error: {}",
e.getMessage());
- return ConsumeResult.FAILURE;
}
- if (null != completableFuture) {
- try {
- Boolean streamResult = completableFuture.get(15,
TimeUnit.MINUTES);
- if (null != streamResult && streamResult) {
- log.info("RocketMQA2AServerRoutes deal msg
success");
- return ConsumeResult.SUCCESS;
- } else {
- log.info("RocketMQA2AServerRoutes deal msg
failed");
- return ConsumeResult.FAILURE;
- }
- } catch (Exception e) {
- log.error("RocketMQA2AServerRoutes error: {}",
e.getMessage());
+ } catch (Exception e) {
+ log.error("RocketMQA2AServerRoutes error: {}", e.getMessage());
+ return ConsumeResult.FAILURE;
+ }
+ if (null != completableFuture) {
+ try {
+ Boolean streamResult = completableFuture.get(15,
TimeUnit.MINUTES);
+ if (null != streamResult && streamResult) {
+ log.info("RocketMQA2AServerRoutes deal msg success");
+ return ConsumeResult.SUCCESS;
+ } else {
+ log.info("RocketMQA2AServerRoutes deal msg failed");
return ConsumeResult.FAILURE;
}
+ } catch (Exception e) {
+ log.error("RocketMQA2AServerRoutes error: {}",
e.getMessage());
+ return ConsumeResult.FAILURE;
}
- return ConsumeResult.SUCCESS;
- }).build();
- return consumer;
- }
-
- private static Message buildMessage(String topic, String liteTopic,
RocketMQResponse response) {
- if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
- log.error("RocketMQA2AServerRoutes buildMessage param error,
topic: {}, liteTopic: {}, response: {}", topic, liteTopic,
JSON.toJSONString(response));
- return null;
- }
- String missionJsonStr = JSON.toJSONString(response);
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- final Message message = provider.newMessageBuilder()
- .setTopic(topic)
- .setBody(missionJsonStr.getBytes(StandardCharsets.UTF_8))
- .setLiteTopic(liteTopic)
- .build();
- return message;
+ }
+ return ConsumeResult.SUCCESS;
+ };
}
private JSONRPCErrorResponse handleError(JsonProcessingException
exception) {
@@ -276,8 +230,8 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
return new JSONRPCErrorResponse(id, jsonRpcError);
}
- private JSONRPCResponse<?> processNonStreamingRequest(
- NonStreamingJSONRPCRequest<?> request, ServerCallContext context) {
+ private JSONRPCResponse<?>
processNonStreamingRequest(NonStreamingJSONRPCRequest<?> request,
+ ServerCallContext context) {
if (request instanceof GetTaskRequest req) {
return jsonRpcHandler.onGetTask(req, context);
} else if (request instanceof CancelTaskRequest req) {
@@ -299,8 +253,8 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
}
}
- private Multi<? extends JSONRPCResponse<?>> processStreamingRequest(
- JSONRPCRequest<?> request, ServerCallContext context) {
+ private Multi<? extends JSONRPCResponse<?>>
processStreamingRequest(JSONRPCRequest<?> request,
+ ServerCallContext context) {
Flow.Publisher<? extends JSONRPCResponse<?>> publisher;
if (request instanceof SendStreamingMessageRequest req) {
publisher = jsonRpcHandler.onMessageSendStream(req, context);
@@ -326,9 +280,12 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
private MultiSseSupport(Producer producer) {
this.producer = producer;
}
- public void writeRocketmq(Multi<Buffer> multi, RoutingContext rc,
String workAgentResponseTopic, String liteTopic, String msgId,
CompletableFuture<Boolean> completableFuture) {
+
+ public void writeRocketmq(Multi<Buffer> multi, RoutingContext rc,
String workAgentResponseTopic,
+ String liteTopic, String msgId, CompletableFuture<Boolean>
completableFuture) {
multi.subscribe().withSubscriber(new Flow.Subscriber<Buffer>() {
Flow.Subscription upstream;
+
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstream = subscription;
@@ -342,15 +299,11 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
@Override
public void onNext(Buffer item) {
try {
- RocketMQResponse response = new RocketMQResponse();
- response.setEnd(false);
- response.setStream(true);
- response.setLiteTopic(liteTopic);
- response.setContextId(response.getContextId());
- response.setMessageId(msgId);
- response.setResponseBody(item.toString());
+ RocketMQResponse response = new
RocketMQResponse(liteTopic, null, item.toString(), msgId, true,
+ false);
SendReceipt send =
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
- log.info("MultiSseSupport send response success,
msgId: {}, time: {}, response: {}", send.getMessageId(),
System.currentTimeMillis(), JSON.toJSONString(response));
+ log.info("MultiSseSupport send response success,
msgId: {}, time: {}", send.getMessageId(),
+ System.currentTimeMillis(),
JSON.toJSONString(response));
} catch (Exception e) {
log.error("MultiSseSupport send stream error, {}",
e.getMessage());
}
@@ -365,15 +318,11 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
@Override
public void onComplete() {
+ RocketMQResponse response = new
RocketMQResponse(liteTopic, null, null, msgId, true, true);
try {
- RocketMQResponse response = new RocketMQResponse();
- response.setEnd(true);
- response.setStream(true);
- response.setLiteTopic(liteTopic);
- response.setContextId(response.getContextId());
- response.setMessageId(msgId);
SendReceipt send =
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
- log.info("MultiSseSupport send response success,
msgId: {}, time: {}, response: {}", send.getMessageId(),
System.currentTimeMillis(), JSON.toJSONString(response));
+ log.info("MultiSseSupport send response success,
msgId: {}, time: {}, response: {}",
+ send.getMessageId(), System.currentTimeMillis(),
JSON.toJSONString(response));
} catch (ClientException e) {
log.error("MultiSseSupport error send complete, msgId:
{}", e.getMessage());
}
@@ -382,7 +331,8 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
});
}
- public void subscribeObjectRocketmq(Multi<Object> multi,
RoutingContext rc, String workAgentResponseTopic, String liteTopic, String
msgId, CompletableFuture<Boolean> completableFuture) {
+ public void subscribeObjectRocketmq(Multi<Object> multi,
RoutingContext rc, String workAgentResponseTopic,
+ String liteTopic, String msgId, CompletableFuture<Boolean>
completableFuture) {
AtomicLong count = new AtomicLong();
Multi<Buffer> map = multi.map(new Function<Object, Buffer>() {
@Override
@@ -400,16 +350,9 @@ public class RocketMQA2AServerRoutes extends
A2AServerRoutes {
}
}
- private static String toJsonString(Object o) {
- try {
- return OBJECT_MAPPER.writeValueAsString(o);
- } catch (JsonProcessingException ex) {
- throw new RuntimeException(ex);
- }
- }
-
private void checkConfigParam() {
- if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
+ if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) ||
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(
+ BIZ_CONSUMER_GROUP)) {
if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) {
log.error("rocketMQEndpoint is empty");
}
diff --git
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
index f47db2e..3daa4c4 100644
--- a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
+++ b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
@@ -16,26 +16,15 @@
*/
package org.apache.rocketmq.a2a.transport;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
-import com.alibaba.fastjson.JSON;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import io.a2a.client.transport.spi.ClientTransport;
-import org.apache.rocketmq.a2a.common.RocketMQRequest;
-import org.apache.rocketmq.a2a.common.RocketMQResponse;
+import org.apache.rocketmq.a2a.common.RocketMQResourceInfo;
import org.apache.rocketmq.a2a.common.RocketMQA2AConstant;
import io.a2a.client.http.A2ACardResolver;
import io.a2a.client.http.A2AHttpClient;
@@ -46,12 +35,10 @@ import
io.a2a.client.transport.spi.interceptors.PayloadAndHeaders;
import io.a2a.spec.A2AClientError;
import io.a2a.spec.A2AClientException;
import io.a2a.spec.AgentCard;
-import io.a2a.spec.AgentInterface;
import io.a2a.spec.CancelTaskRequest;
import io.a2a.spec.CancelTaskResponse;
import io.a2a.spec.DeleteTaskPushNotificationConfigParams;
import io.a2a.spec.DeleteTaskPushNotificationConfigRequest;
-import io.a2a.spec.DeleteTaskPushNotificationConfigResponse;
import io.a2a.spec.EventKind;
import io.a2a.spec.GetAuthenticatedExtendedCardRequest;
import io.a2a.spec.GetAuthenticatedExtendedCardResponse;
@@ -60,9 +47,7 @@ import io.a2a.spec.GetTaskPushNotificationConfigRequest;
import io.a2a.spec.GetTaskPushNotificationConfigResponse;
import io.a2a.spec.GetTaskRequest;
import io.a2a.spec.GetTaskResponse;
-import io.a2a.spec.JSONRPCError;
import io.a2a.spec.JSONRPCMessage;
-import io.a2a.spec.JSONRPCResponse;
import io.a2a.spec.ListTaskPushNotificationConfigParams;
import io.a2a.spec.ListTaskPushNotificationConfigRequest;
import io.a2a.spec.ListTaskPushNotificationConfigResponse;
@@ -77,43 +62,33 @@ import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
-import io.a2a.util.Utils;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.LitePushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.shaded.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.rocketmq.a2a.common.RocketMQA2AConstant.DATA_PREFIX;
-import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTPS_URL_PREFIX;
-import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.HTTP_URL_PREFIX;
import static io.a2a.util.Assert.checkNotNullParam;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.CANCEL_TASK_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.GET_TASK_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.SEND_MESSAGE_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQA2AConstant.SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE;
+import static
org.apache.rocketmq.a2a.common.RocketMQResourceInfo.parseAgentCardAddition;
+import static
org.apache.rocketmq.a2a.common.RocketMQUtil.LITE_TOPIC_USE_DEFAULT_RECOVER_MAP;
+import static
org.apache.rocketmq.a2a.common.RocketMQUtil.MESSAGE_STREAM_RESPONSE_MAP;
+import static
org.apache.rocketmq.a2a.common.RocketMQUtil.RECOVER_MESSAGE_STREAM_RESPONSE_MAP;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.checkConfigParam;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.getResult;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.initAndGetConsumer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.initAndGetProducer;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.sendRocketMQRequest;
+import static org.apache.rocketmq.a2a.common.RocketMQUtil.unmarshalResponse;
public class RocketMQTransport implements ClientTransport {
private static final Logger log =
LoggerFactory.getLogger(RocketMQTransport.class);
- private static final TypeReference<SendMessageResponse>
SEND_MESSAGE_RESPONSE_REFERENCE = new TypeReference<>() { };
- private static final TypeReference<GetTaskResponse>
GET_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
- private static final TypeReference<CancelTaskResponse>
CANCEL_TASK_RESPONSE_REFERENCE = new TypeReference<>() { };
- private static final TypeReference<GetTaskPushNotificationConfigResponse>
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
- private static final TypeReference<SetTaskPushNotificationConfigResponse>
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
- private static final TypeReference<ListTaskPushNotificationConfigResponse>
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>() {
};
- private static final
TypeReference<DeleteTaskPushNotificationConfigResponse>
DELETE_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE = new TypeReference<>()
{ };
- private static final TypeReference<GetAuthenticatedExtendedCardResponse>
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE = new TypeReference<>() { };
- private static final ConcurrentMap<String /* namespace */, Map<String /*
WorkerAgentResponseTopic */, LitePushConsumer>> ROCKETMQ_CONSUMER_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* namespace */, Map<String /*
agentTopic */, Producer>> ROCKETMQ_PRODUCER_MAP = new ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, CompletableFuture<String>>> MESSAGE_RESPONSE_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* namespace */, Map<String /*
msgId */, SSEEventListener>> MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* namespace */, Map<String /*
liteTopic */, Boolean>> LITE_TOPIC_USE_DEFAULT_RECOVER_MAP = new
ConcurrentHashMap<>();
- private static final ConcurrentMap<String /* namespace */, Map<String /*
Key */, SSEEventListener>> RECOVER_MESSAGE_STREAM_RESPONSE_MAP = new
ConcurrentHashMap<>();
-
private final String agentTopic;
private final String accessKey;
private final String secretKey;
@@ -157,65 +132,25 @@ public class RocketMQTransport implements ClientTransport
{
this.agentTopic = rocketAgentCardInfo.getTopic();
this.namespace =
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" :
rocketAgentCardInfo.getNamespace();
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k
-> new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
- checkConfigParam();
- initRocketMQProducerAndConsumer();
- }
-
- private void initRocketMQProducerAndConsumer() {
- if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
StringUtils.isEmpty(this.liteTopic)) {
- throw new A2AClientException("RocketMQTransport
initRocketMQProducerAndConsumer param error");
- }
+ checkConfigParam(this.endpoint, this.workAgentResponseTopic,
this.workAgentResponseGroupID, this.liteTopic, this.agentTopic);
try {
- Map<String, LitePushConsumer> consumerMap =
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- if (consumerMap.containsKey(this.workAgentResponseTopic)) {
- this.litePushConsumer =
consumerMap.get(this.workAgentResponseTopic);
- this.litePushConsumer.subscribeLite(this.liteTopic);
- } else {
- LitePushConsumer litePushConsumer =
consumerMap.computeIfAbsent(this.workAgentResponseTopic, k -> {
- try {
- return buildConsumer();
- } catch (ClientException e) {
- log.error("RocketMQTransport
initRocketMQProducerAndConsumer buildConsumer error: {}", e.getMessage());
- throw new RuntimeException(e);
- }
- });
- if (null != litePushConsumer) {
- litePushConsumer.subscribeLite(this.liteTopic);
- this.litePushConsumer = litePushConsumer;
- }
- }
- Map<String, Producer> producerMap =
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- if (!producerMap.containsKey(this.agentTopic)) {
- this.producer = buildProducer(this.agentTopic);
- producerMap.put(this.agentTopic, this.producer);
- }
- log.info("RocketMQTransport initRocketMQProducerAndConsumer
success");
- } catch (Exception e) {
- log.error("RocketMQTransport initRocketMQProducerAndConsumer
error: {}", e.getMessage());
+ this.litePushConsumer = initAndGetConsumer(this.namespace,
this.endpoint, this.accessKey, this.secretKey, this.workAgentResponseTopic,
this.workAgentResponseGroupID, this.liteTopic);
+ this.producer = initAndGetProducer(this.namespace, this.endpoint,
this.accessKey, this.secretKey, this.agentTopic);
+ } catch (ClientException e) {
+ log.error("RocketMQTransport init rocketmq client error, e: {}",
e.getMessage());
+ throw new RuntimeException("RocketMQTransport init rocketmq client
error");
}
}
@Override
public EventKind sendMessage(MessageSendParams request, ClientCallContext
context) throws A2AClientException {
- SendMessageRequest sendMessageRequest = new
SendMessageRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SendMessageRequest.METHOD)
- .params(request)
- .build();
+ checkNotNullParam("request", request);
+ SendMessageRequest sendMessageRequest = new
SendMessageRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SendMessageRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(SendMessageRequest.METHOD, sendMessageRequest,
this.agentCard, context);
try {
String liteTopic = dealLiteTopic(request.message().getContextId());
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport sendMessage error,
responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- SendMessageResponse response =
unmarshalResponse(String.valueOf(result), SEND_MESSAGE_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ SendMessageResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
SEND_MESSAGE_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport sendMessage error: {}",
e.getMessage());
@@ -227,16 +162,12 @@ public class RocketMQTransport implements ClientTransport
{
public void sendMessageStreaming(MessageSendParams request,
Consumer<StreamingEventKind> eventConsumer, Consumer<Throwable> errorConsumer,
ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
checkNotNullParam("eventConsumer", eventConsumer);
- SendStreamingMessageRequest sendStreamingMessageRequest = new
SendStreamingMessageRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SendStreamingMessageRequest.METHOD)
- .params(request)
- .build();
+ SendStreamingMessageRequest sendStreamingMessageRequest = new
SendStreamingMessageRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SendStreamingMessageRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(SendStreamingMessageRequest.METHOD,
sendStreamingMessageRequest, this.agentCard, context);
SSEEventListener sseEventListener = new
SSEEventListener(eventConsumer, errorConsumer);
try {
String liteTopic = dealLiteTopic(request.message().getContextId());
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
liteTopic);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
if (StringUtils.isEmpty(responseMessageId)) {
log.error("RocketMQTransport sendMessageStreaming error,
responseMessageId is null");
return;
@@ -266,7 +197,6 @@ public class RocketMQTransport implements ClientTransport {
log.info("litePushConsumer subscribeLite liteTopic: {}",
liteTopic);
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new
HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
}
-
String closeLiteTopic =
(String)request.metadata().get(RocketMQA2AConstant.CLOSE_LITE_TOPIC);
if (null != litePushConsumer &&
!StringUtils.isEmpty(closeLiteTopic)) {
litePushConsumer.unsubscribeLite(closeLiteTopic);
@@ -284,24 +214,12 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public Task getTask(TaskQueryParams request, ClientCallContext context)
throws A2AClientException {
- GetTaskRequest getTaskRequest = new GetTaskRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(GetTaskRequest.METHOD)
- .params(request)
- .build();
+ checkNotNullParam("request", request);
+ GetTaskRequest getTaskRequest = new
GetTaskRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetTaskRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(GetTaskRequest.METHOD, getTaskRequest, this.agentCard,
context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport getTask error, responseMessageId
is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- GetTaskResponse response = unmarshalResponse(result,
GET_TASK_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ GetTaskResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
GET_TASK_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport getTask error: {}", e.getMessage());
@@ -312,24 +230,11 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public Task cancelTask(TaskIdParams request, ClientCallContext context)
throws A2AClientException {
checkNotNullParam("request", request);
- CancelTaskRequest cancelTaskRequest = new CancelTaskRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(CancelTaskRequest.METHOD)
- .params(request)
- .build();
+ CancelTaskRequest cancelTaskRequest = new
CancelTaskRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(CancelTaskRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(CancelTaskRequest.METHOD, cancelTaskRequest, this.agentCard,
context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport cancelTask error,
responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- CancelTaskResponse response = unmarshalResponse(result,
CANCEL_TASK_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ CancelTaskResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
CANCEL_TASK_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport cancelTask error: {}",
e.getMessage());
@@ -340,25 +245,11 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public TaskPushNotificationConfig
setTaskPushNotificationConfiguration(TaskPushNotificationConfig request,
ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
- SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest =
new SetTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(SetTaskPushNotificationConfigRequest.METHOD)
- .params(request)
- .build();
-
+ SetTaskPushNotificationConfigRequest setTaskPushNotificationRequest =
new
SetTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(SetTaskPushNotificationConfigRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(SetTaskPushNotificationConfigRequest.METHOD,
setTaskPushNotificationRequest, agentCard, context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport
setTaskPushNotificationConfiguration error, responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- SetTaskPushNotificationConfigResponse response =
unmarshalResponse(result, SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ SetTaskPushNotificationConfigResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
SET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport setTaskPushNotificationConfiguration
error: {}", e.getMessage());
@@ -369,26 +260,11 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public TaskPushNotificationConfig
getTaskPushNotificationConfiguration(GetTaskPushNotificationConfigParams
request, ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
- GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest
- = new GetTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(GetTaskPushNotificationConfigRequest.METHOD)
- .params(request)
- .build();
-
+ GetTaskPushNotificationConfigRequest getTaskPushNotificationRequest =
new
GetTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetTaskPushNotificationConfigRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(GetTaskPushNotificationConfigRequest.METHOD,
getTaskPushNotificationRequest, this.agentCard, context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport
getTaskPushNotificationConfiguration error, responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> completableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId, completableFuture);
- String result = completableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- GetTaskPushNotificationConfigResponse response =
unmarshalResponse(result, GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ GetTaskPushNotificationConfigResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
GET_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport getTaskPushNotificationConfiguration
error: {}", e.getMessage());
@@ -399,24 +275,11 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public List<TaskPushNotificationConfig>
listTaskPushNotificationConfigurations(ListTaskPushNotificationConfigParams
request, ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
- ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest
= new ListTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(ListTaskPushNotificationConfigRequest.METHOD)
- .params(request)
- .build();
+ ListTaskPushNotificationConfigRequest listTaskPushNotificationRequest
= new
ListTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(ListTaskPushNotificationConfigRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(ListTaskPushNotificationConfigRequest.METHOD,
listTaskPushNotificationRequest, this.agentCard, context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport
listTaskPushNotificationConfigurations error, responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- ListTaskPushNotificationConfigResponse response =
unmarshalResponse(result,
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ ListTaskPushNotificationConfigResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
LIST_TASK_PUSH_NOTIFICATION_CONFIG_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
log.error("RocketMQTransport
listTaskPushNotificationConfigurations error: {}", e.getMessage());
@@ -427,23 +290,11 @@ public class RocketMQTransport implements ClientTransport
{
@Override
public void
deleteTaskPushNotificationConfigurations(DeleteTaskPushNotificationConfigParams
request, ClientCallContext context) throws A2AClientException {
checkNotNullParam("request", request);
- DeleteTaskPushNotificationConfigRequest
deleteTaskPushNotificationRequest = new
DeleteTaskPushNotificationConfigRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(DeleteTaskPushNotificationConfigRequest.METHOD)
- .params(request)
- .build();
+ DeleteTaskPushNotificationConfigRequest
deleteTaskPushNotificationRequest = new
DeleteTaskPushNotificationConfigRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(DeleteTaskPushNotificationConfigRequest.METHOD).params(request).build();
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(DeleteTaskPushNotificationConfigRequest.METHOD,
deleteTaskPushNotificationRequest, agentCard, context);
try {
- String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport
deleteTaskPushNotificationConfigurations error, responseMessageId is null");
- return;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- objectCompletableFuture.get(120, TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
+ String responseMessageId = sendRocketMQRequest(payloadAndHeaders,
this.agentTopic, liteTopic, this.workAgentResponseTopic, this.producer);
+ getResult(responseMessageId, this.namespace);
} catch (Exception e) {
log.error("RocketMQTransport
deleteTaskPushNotificationConfigurations error: {}", e.getMessage());
}
@@ -462,22 +313,10 @@ public class RocketMQTransport implements ClientTransport
{
return agentCard;
}
try {
- GetAuthenticatedExtendedCardRequest
getExtendedAgentCardRequest = new GetAuthenticatedExtendedCardRequest.Builder()
- .jsonrpc(JSONRPCMessage.JSONRPC_VERSION)
- .method(GetAuthenticatedExtendedCardRequest.METHOD)
- .build(); // id will be randomly generated
+ GetAuthenticatedExtendedCardRequest
getExtendedAgentCardRequest = new
GetAuthenticatedExtendedCardRequest.Builder().jsonrpc(JSONRPCMessage.JSONRPC_VERSION).method(GetAuthenticatedExtendedCardRequest.METHOD).build();
// id will be randomly generated
PayloadAndHeaders payloadAndHeaders =
applyInterceptors(GetAuthenticatedExtendedCardRequest.METHOD,
getExtendedAgentCardRequest, this.agentCard, context);
- String responseMessageId =
sendRocketMQRequest(payloadAndHeaders, this.liteTopic);
- if (StringUtils.isEmpty(responseMessageId)) {
- log.error("RocketMQTransport getAgentCard
responseMessageId is null");
- return null;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
- CompletableFuture<String> objectCompletableFuture = new
CompletableFuture<>();
- completableFutureMap.put(responseMessageId,
objectCompletableFuture);
- String result = objectCompletableFuture.get(120,
TimeUnit.SECONDS);
- completableFutureMap.remove(responseMessageId);
- GetAuthenticatedExtendedCardResponse response =
unmarshalResponse(result, GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE);
+ String responseMessageId =
sendRocketMQRequest(payloadAndHeaders, this.agentTopic, liteTopic,
this.workAgentResponseTopic, this.producer);
+ GetAuthenticatedExtendedCardResponse response =
unmarshalResponse(getResult(responseMessageId, this.namespace),
GET_AUTHENTICATED_EXTENDED_CARD_RESPONSE_REFERENCE);
return response.getResult();
} catch (Exception e) {
throw new A2AClientException("RocketMQTransport getAgentCard
error: " + e, e);
@@ -488,42 +327,7 @@ public class RocketMQTransport implements ClientTransport {
}
@Override
- public void close() {
- try {
- if (null != this.producer) {
- this.producer.close();
- }
- if (null != this.litePushConsumer) {
- this.litePushConsumer.close();
- }
- log.info("RocketMQTransport close success");
- } catch (Exception e) {
- log.error("RocketMQTransport close error: {}", e.getMessage());
- }
- }
-
- private void checkConfigParam() {
- if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseTopic) ||
- StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic)) {
-
- if (StringUtils.isEmpty(this.endpoint)) {
- log.error("RocketMQTransport checkConfigParam endpoint is
empty");
- }
- if (StringUtils.isEmpty(this.workAgentResponseTopic)) {
- log.error("RocketMQTransport checkConfigParam
workAgentResponseTopic is empty");
- }
- if (StringUtils.isEmpty(this.workAgentResponseGroupID)) {
- log.error("RocketMQTransport checkConfigParam
workAgentResponseGroupID is empty");
- }
- if (StringUtils.isEmpty(this.liteTopic)) {
- log.error("RocketMQTransport checkConfigParam liteTopic is
empty");
- }
- if (StringUtils.isEmpty(this.agentTopic)) {
- log.error("RocketMQTransport checkConfigParam agentTopic is
empty");
- }
- throw new RuntimeException("RocketMQTransport checkConfigParam
error, init failed !!!");
- }
- }
+ public void close() {}
private String dealLiteTopic(String contextId) {
String liteTopic = this.liteTopic;
@@ -532,200 +336,12 @@ public class RocketMQTransport implements
ClientTransport {
litePushConsumer.subscribeLite(contextId);
liteTopic = contextId;
} catch (ClientException e) {
-
+ log.error("dealLiteTopic error: {}", e.getMessage());
}
}
return liteTopic;
}
- private LitePushConsumer buildConsumer() throws ClientException {
- if (StringUtils.isEmpty(this.endpoint) ||
StringUtils.isEmpty(this.workAgentResponseGroupID) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
- log.error("RocketMQTransport buildConsumer check param error");
- return null;
- }
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(this.endpoint)
- .setNamespace(this.namespace)
- .setCredentialProvider(sessionCredentialsProvider)
- .build();
- LitePushConsumer litePushConsumer =
provider.newLitePushConsumerBuilder()
- .setClientConfiguration(clientConfiguration)
- .setConsumerGroup(this.workAgentResponseGroupID)
- .bindTopic(this.workAgentResponseTopic)
- .setMessageListener(messageView -> {
- try {
- Optional<String> liteTopicOpt = messageView.getLiteTopic();
- String liteTopic = liteTopicOpt.get();
- if (StringUtils.isEmpty(liteTopic)) {
- log.error("RocketMQTransport buildConsumer liteTopic
is empty");
- return ConsumeResult.SUCCESS;
- }
- byte[] result = new
byte[messageView.getBody().remaining()];
- messageView.getBody().get(result);
- String resultStr = new String(result,
StandardCharsets.UTF_8);
- RocketMQResponse response = JSON.parseObject(resultStr,
RocketMQResponse.class);
- if (null == response ||
StringUtils.isEmpty(response.getMessageId())) {
- log.error("RocketMQTransport litePushConsumer consumer
error, response is null or messageId is empty");
- return ConsumeResult.SUCCESS;
- }
- if (!response.isStream()) {
- return dealNonStreamResult(response, this.namespace);
- }
- return dealStreamResult(response, this.namespace,
liteTopic);
- } catch (Exception e) {
- log.error("RocketMQTransport litePushConsumer consumer
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
- return ConsumeResult.SUCCESS;
- }
- }).build();
- return litePushConsumer;
- }
-
- private ConsumeResult dealStreamResult(RocketMQResponse response, String
namespace, String liteTopic) {
- if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() &&
StringUtils.isEmpty(response.getResponseBody())) {
- log.error("RocketMQTransport dealStreamResult param is error,
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
- return ConsumeResult.SUCCESS;
- }
-
- Map<String, SSEEventListener> sseEventListenerMap =
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
- if (null == sseEventListenerMap) {
- log.error("RocketMQTransport dealStreamResult sseEventListenerMap
is null");
- return ConsumeResult.SUCCESS;
- }
- SSEEventListener sseEventListener =
sseEventListenerMap.get(response.getMessageId());
- if (null == sseEventListener) {
- Map<String, Boolean> booleanMap =
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
- if (null == booleanMap) {
- log.error("RocketMQTransport dealStreamResult booleanMap is
null");
- return ConsumeResult.SUCCESS;
- }
- Boolean useDefaultRecoverModeConsumer = booleanMap.get(liteTopic);
- if (null == useDefaultRecoverModeConsumer ||
!useDefaultRecoverModeConsumer) {
- return ConsumeResult.SUCCESS;
- }
- if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() &&
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
- Map<String, SSEEventListener> sseEventListenerMapRecover =
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
- if (null == sseEventListenerMapRecover) {
- log.error("RocketMQTransport dealStreamResult
sseEventListenerMapRecover is null");
- return ConsumeResult.SUCCESS;
- }
- sseEventListener =
sseEventListenerMapRecover.get(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER);
- if (null == sseEventListener) {
- log.error("RocketMQTransport dealStreamResult
sseEventListenerMapRecover get sseEventListener is null");
- return ConsumeResult.SUCCESS;
- }
- }
- if (null == sseEventListener) {
- return ConsumeResult.SUCCESS;
- }
- }
- String item = response.getResponseBody();
- if (!StringUtils.isEmpty(item) && item.startsWith(DATA_PREFIX)) {
- item = item.substring(5).trim();
- if (!item.isEmpty()) {
- try {
- sseEventListener.onMessage(item, new
CompletableFuture<>());
- } catch (Throwable e) {
- log.error("RocketMQTransport dealStreamResult error: {}",
e.getMessage());
- return ConsumeResult.FAILURE;
- }
- }
- if (response.isEnd() &&
!StringUtils.isEmpty(response.getMessageId())) {
- sseEventListenerMap.remove(response.getMessageId());
- }
- }
- return ConsumeResult.SUCCESS;
- }
-
- private ConsumeResult dealNonStreamResult(RocketMQResponse response,
String namespace) {
- if (null == response || StringUtils.isEmpty(response.getMessageId())
|| StringUtils.isEmpty(response.getResponseBody())) {
- log.error("RocketMQTransport dealNonStreamResult param is error,
response: {}", JSON.toJSONString(response));
- return ConsumeResult.SUCCESS;
- }
- Map<String, CompletableFuture<String>> completableFutureMap =
MESSAGE_RESPONSE_MAP.get(namespace);
- if (null != completableFutureMap &&
completableFutureMap.containsKey(response.getMessageId())) {
- CompletableFuture<String> completableFuture =
completableFutureMap.get(response.getMessageId());
- completableFuture.complete(response.getResponseBody());
- }
- return ConsumeResult.SUCCESS;
- }
-
- private String sendRocketMQRequest(PayloadAndHeaders payloadAndHeaders,
String liteTopic) throws JsonProcessingException {
- if (null == payloadAndHeaders || StringUtils.isEmpty(this.agentTopic)
|| StringUtils.isEmpty(liteTopic) ||
StringUtils.isEmpty(this.workAgentResponseTopic)) {
- log.error("RocketMQTransport sendRocketMQRequest error,
payloadAndHeaders: {}, agentTopic: {}, workAgentResponseTopic: {}, liteTopic:
{}", payloadAndHeaders, this.agentTopic, this.workAgentResponseTopic,
this.liteTopic);
- return null;
- }
- RocketMQRequest request = new RocketMQRequest();
-
request.setRequestBody(Utils.OBJECT_MAPPER.writeValueAsString(payloadAndHeaders.getPayload()));
- request.setAgentTopic(this.agentTopic);
- request.setWorkAgentResponseTopic(this.workAgentResponseTopic);
- request.setLiteTopic(liteTopic);
- if (payloadAndHeaders.getHeaders() != null) {
- for (Map.Entry<String, String> entry :
payloadAndHeaders.getHeaders().entrySet()) {
- request.addHeader(entry.getKey(), entry.getValue());
- }
- }
- String messageBodyStr = serialText(request);
- if (StringUtils.isEmpty(messageBodyStr)) {
- return null;
- }
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- if (null == producer) {
- log.error("RocketMQTransport sendRocketMQRequest producer is null,
agentTopic: {}", this.agentTopic);
- return null;
- }
- byte[] body = messageBodyStr.getBytes(StandardCharsets.UTF_8);
- final Message message = provider.newMessageBuilder()
- .setTopic(this.agentTopic)
- .setBody(body)
- .build();
- try {
- final SendReceipt sendReceipt = producer.send(message);
- if (!StringUtils.isEmpty(sendReceipt.getMessageId().toString())) {
- return sendReceipt.getMessageId().toString();
- }
- } catch (Throwable t) {
- return null;
- }
- return null;
- }
- private static void printPrompt(String role) {
- System.out.print("\n\u001B[36m" + role + " > \u001B[0m");
- }
-
- private <T extends JSONRPCResponse<?>> T unmarshalResponse(String
response, TypeReference<T> typeReference)
- throws A2AClientException, JsonProcessingException {
- T value = Utils.unmarshalFrom(response, typeReference);
- JSONRPCError error = value.getError();
- if (error != null) {
- throw new A2AClientException(error.getMessage() + (error.getData()
!= null ? ": " + error.getData() : ""), error);
- }
- return value;
- }
-
- private static String serialText(RocketMQRequest rocketMQRequest) {
- if (null == rocketMQRequest ||
StringUtils.isEmpty(rocketMQRequest.getRequestBody()) ||
StringUtils.isEmpty(rocketMQRequest.getWorkAgentResponseTopic()) ||
StringUtils.isEmpty(rocketMQRequest.getLiteTopic()) ||
StringUtils.isEmpty(rocketMQRequest.getAgentTopic())) {
- return null;
- }
- return JSON.toJSONString(rocketMQRequest);
- }
-
- private Producer buildProducer(String... topics) throws ClientException {
- final ClientServiceProvider provider =
ClientServiceProvider.loadService();
- SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(this.endpoint)
- .setNamespace(this.namespace)
- .setCredentialProvider(sessionCredentialsProvider)
- .setRequestTimeout(Duration.ofSeconds(15))
- .build();
- final ProducerBuilder builder = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- .setTopics(topics);
- return builder.build();
- }
-
private PayloadAndHeaders applyInterceptors(String methodName, Object
payload, AgentCard agentCard, ClientCallContext clientCallContext) {
PayloadAndHeaders payloadAndHeaders = new PayloadAndHeaders(payload,
getHttpHeaders(clientCallContext));
if (interceptors != null && !interceptors.isEmpty()) {
@@ -739,89 +355,4 @@ public class RocketMQTransport implements ClientTransport {
private Map<String, String> getHttpHeaders(@Nullable ClientCallContext
context) {
return context != null ? context.getHeaders() : Collections.emptyMap();
}
-
- private RocketMQResourceInfo parseAgentCardAddition(AgentCard agentCard) {
- if (null == agentCard ||
StringUtils.isEmpty(agentCard.preferredTransport()) ||
StringUtils.isEmpty(agentCard.url()) || null ==
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty())
{
- log.error("parseAgentCardAddition param error, agentCard: {}",
JSON.toJSONString(agentCard));
- return null;
- }
- RocketMQResourceInfo rocketMQResourceInfo = null;
- String preferredTransport = agentCard.preferredTransport();
- if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
- String url = agentCard.url();
- rocketMQResourceInfo = pareAgentCardUrl(url);
- if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
- log.info("RocketMQTransport get rocketMQResourceInfo from
preferredTransport");
- return rocketMQResourceInfo;
- }
- }
- List<AgentInterface> agentInterfaces =
agentCard.additionalInterfaces();
- for (AgentInterface agentInterface : agentInterfaces) {
- String transport = agentInterface.transport();
- if (!StringUtils.isEmpty(transport) &&
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
- String url = agentInterface.url();
- rocketMQResourceInfo = pareAgentCardUrl(url);
- if (null != rocketMQResourceInfo &&
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) &&
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
- log.error("RocketMQTransport get rocketMQResourceInfo from
additionalInterfaces");
- return rocketMQResourceInfo;
- }
- }
- }
- return null;
- }
-
- private static RocketMQResourceInfo pareAgentCardUrl(String agentCardUrl) {
- if (StringUtils.isEmpty(agentCardUrl)) {
- return null;
- }
- String agentUrl = agentCardUrl.replace(HTTP_URL_PREFIX, "");
- String replaceFinal = agentUrl.replace(HTTPS_URL_PREFIX, "");
- String[] split = replaceFinal.split("/");
- if (split.length != 3) {
- return null;
- }
- RocketMQResourceInfo rocketMQResourceInfo = new RocketMQResourceInfo();
- rocketMQResourceInfo.setEndpoint(split[0].trim());
- rocketMQResourceInfo.setNamespace(split[1].trim());
- rocketMQResourceInfo.setTopic(split[2].trim());
- return rocketMQResourceInfo;
- }
-
- private static class RocketMQResourceInfo {
- private String endpoint;
- private String topic;
- private String namespace;
-
- public RocketMQResourceInfo(String endpoint, String topic) {
- this.endpoint = endpoint;
- this.topic = topic;
- }
-
- public RocketMQResourceInfo() {}
-
- public String getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(String endpoint) {
- this.endpoint = endpoint;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getNamespace() {
- return namespace;
- }
-
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
- }
-
}