This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-a2a.git


The following commit(s) were added to refs/heads/main by this push:
     new 9eb59ae  Optimize the code (#8)
9eb59ae is described below

commit 9eb59aeae4366edea47fc71b6480148e3630fe35
Author: Drizzle <[email protected]>
AuthorDate: Tue Dec 16 09:56:02 2025 +0800

    Optimize the code (#8)
    
    * update
    
    Change-Id: I2b5f6364699c1c9e07b6ad381b801c49ce641559
    
    * optimize the code
    
    Change-Id: If2495360c76970ac88387e59e3ac15a6237e2f3e
    
    * update
    
    Change-Id: Iceef7a1bd1d05bfa498694ba7c3a007bbfdf378e
    
    * update
    
    Change-Id: Id0b75d941f02387d7b05a96731ea936526bae012
    
    * add spring web demo
    
    Change-Id: I74483972366bd35fee4a8e6c58b928204b3669ed
    
    * update example
    
    Change-Id: I00ec96acdbf1b7b5aabad5d28b10667b605635e1
    
    * update
    
    Change-Id: I0f51bf41c079d782a1a3adbbd388a32a50170980
    
    * update
    
    Change-Id: Id0e84eb0f4aa59d7aa020b36a7de85a8583f142d
    
    * update demo
    
    Change-Id: Ic7af56d1b946b119816848725465b60335bfc9ee
    
    * optimzie the code
    
    Change-Id: Ie45c5bbb6d27af2dd2bd2c66e5b0ee18faa91a41
    
    * update
    
    Change-Id: I930f4792680e577d14b6ae5f28264c03e93d987c
    
    * change the version
    
    Change-Id: I24e144b98c8e7ff5939c2ebdf609c8535aa9bce8
    
    ---------
    
    Co-authored-by: drizzle.zk <[email protected]>
---
 example/rocketmq-multiagent-base-adk/README.md     |   4 +-
 .../SupervisorAgent-Web/pom.xml                    |   2 +-
 .../java/org/example/service/AgentService.java     |   2 +-
 .../agent/SupervisorAgentA2ASDKMainStream.java     |   5 +-
 example/rocketmq-multiagent-base-adk/pom.xml       |   2 +-
 .../a2a/server/RocketMQA2AServerRoutes.java        |  14 +--
 .../rocketmq/a2a/transport/RocketMQTransport.java  | 100 ++++++++++-----------
 .../a2a/transport/RocketMQTransportConfig.java     |  38 ++++----
 .../a2a/transport/RocketMQTransportProvider.java   |   2 +-
 9 files changed, 82 insertions(+), 87 deletions(-)

diff --git a/example/rocketmq-multiagent-base-adk/README.md 
b/example/rocketmq-multiagent-base-adk/README.md
index f0383a6..5900c88 100644
--- a/example/rocketmq-multiagent-base-adk/README.md
+++ b/example/rocketmq-multiagent-base-adk/README.md
@@ -44,7 +44,7 @@ cd WeatherAgent
 ```
 
 ```shell
-MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace= 
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup 
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace= 
-DbizTopic=WeatherAgentTask -DbizConsumerGroup=WeatherAgentTaskConsumerGroup 
-DrocketMQAK= -DrocketMQSK= -DapiKey= -DappId= " mvn quarkus:dev
 ```
 ![img.png](img.png)
 
@@ -54,7 +54,7 @@ cd TravelAgent
 ```
 
 ```shell
- MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace= 
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup 
-DrocketMQAk= -DrocketMQSk= -DapiKey= -DappId= " mvn quarkus:dev
+ MAVEN_OPTS="-DrocketMQEndpoint= -DrocketMQNamespace= 
-DbizTopic=TravelAgentTask -DbizConsumerGroup=TravelAgentTaskConsumerGroup 
-DrocketMQAK= -DrocketMQSK= -DapiKey= -DappId= " mvn quarkus:dev
 ```
 ![img_1.png](img_1.png)
 
diff --git a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
index 284f0f5..7810d9a 100644
--- a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/pom.xml
@@ -119,7 +119,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-a2a</artifactId>
-            <version>1.0.6</version>
+            <version>1.0.7</version>
         </dependency>
     </dependencies>
 
diff --git 
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
index ad19009..512e17e 100644
--- 
a/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
+++ 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent-Web/src/main/java/org/example/service/AgentService.java
@@ -313,7 +313,7 @@ public class AgentService {
         };
         //config rocketmq info
         RocketMQTransportConfig rocketMQTransportConfig = new 
RocketMQTransportConfig();
-        rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
+        rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE);
         rocketMQTransportConfig.setAccessKey(accessKey);
         rocketMQTransportConfig.setSecretKey(secretKey);
         
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
diff --git 
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
index 199a1cd..e41a9b6 100644
--- 
a/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
+++ 
b/example/rocketmq-multiagent-base-adk/SupervisorAgent/src/main/java/agent/SupervisorAgentA2ASDKMainStream.java
@@ -263,7 +263,7 @@ public class SupervisorAgentA2ASDKMainStream {
             System.err.println("Streaming error occurred: " + 
error.getMessage());
         };
         RocketMQTransportConfig rocketMQTransportConfig = new 
RocketMQTransportConfig();
-        rocketMQTransportConfig.setRocketMQNamespace(ROCKETMQ_NAMESPACE);
+        rocketMQTransportConfig.setNamespace(ROCKETMQ_NAMESPACE);
         rocketMQTransportConfig.setAccessKey(accessKey);
         rocketMQTransportConfig.setSecretKey(secretKey);
         
rocketMQTransportConfig.setWorkAgentResponseGroupID(WORK_AGENT_RESPONSE_GROUP_ID);
@@ -328,11 +328,8 @@ public class SupervisorAgentA2ASDKMainStream {
                     } catch (Exception e) {
                         System.out.println("解析过程出现异常");
                     }
-                } else {
-                    //System.out.println(content);
                 }
             } else {
-                //System.out.println(content);
                 log.debug("Agent 响应: {}", content);
             }
         });
diff --git a/example/rocketmq-multiagent-base-adk/pom.xml 
b/example/rocketmq-multiagent-base-adk/pom.xml
index 4860805..9810267 100644
--- a/example/rocketmq-multiagent-base-adk/pom.xml
+++ b/example/rocketmq-multiagent-base-adk/pom.xml
@@ -81,7 +81,7 @@
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-a2a</artifactId>
-                <version>1.0.6</version>
+                <version>1.0.7</version>
             </dependency>
             <dependency>
                 <groupId>io.quarkus</groupId>
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java 
b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
index 8bed7c8..321bc68 100644
--- a/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
+++ b/src/main/java/org/apache/rocketmq/a2a/server/RocketMQA2AServerRoutes.java
@@ -97,8 +97,8 @@ public class RocketMQA2AServerRoutes extends A2AServerRoutes {
     private static final String ROCKETMQ_NAMESPACE = 
System.getProperty("rocketMQNamespace", "");
     private static final String BIZ_TOPIC = System.getProperty("bizTopic", "");
     private static final String BIZ_CONSUMER_GROUP = 
System.getProperty("bizConsumerGroup", "");
-    private static final String ACCESS_KEY = System.getProperty("rocketMQAk", 
"");
-    private static final String SECRET_KEY = System.getProperty("rocketMQSk", 
"");
+    private static final String ACCESS_KEY = System.getProperty("rocketMQAK", 
"");
+    private static final String SECRET_KEY = System.getProperty("rocketMQSK", 
"");
 
     @Inject
     JSONRPCHandler jsonRpcHandler;
@@ -241,7 +241,7 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
 
     private static Message buildMessage(String topic, String liteTopic, 
RocketMQResponse response) {
         if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(liteTopic)) {
-            log.info("RocketMQA2AServerRoutes buildMessage param error, topic: 
{}, liteTopic: {}, response: {}", topic, liteTopic, 
JSON.toJSONString(response));
+            log.error("RocketMQA2AServerRoutes buildMessage param error, 
topic: {}, liteTopic: {}, response: {}", topic, liteTopic, 
JSON.toJSONString(response));
             return null;
         }
         String missionJsonStr = JSON.toJSONString(response);
@@ -375,7 +375,7 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
                         SendReceipt send = 
producer.send(buildMessage(workAgentResponseTopic, liteTopic, response));
                         log.info("MultiSseSupport send response success, 
msgId: {}, time: {}, response: {}", send.getMessageId(), 
System.currentTimeMillis(), JSON.toJSONString(response));
                     } catch (ClientException e) {
-                        log.info("MultiSseSupport error send complete, msgId: 
{}", e.getMessage());
+                        log.error("MultiSseSupport error send complete, msgId: 
{}", e.getMessage());
                     }
                     completableFuture.complete(true);
                 }
@@ -411,13 +411,13 @@ public class RocketMQA2AServerRoutes extends 
A2AServerRoutes {
     private void checkConfigParam() {
         if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT) || 
StringUtils.isEmpty(BIZ_TOPIC) || StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
             if (StringUtils.isEmpty(ROCKETMQ_ENDPOINT)) {
-                log.info("rocketMQEndpoint is empty");
+                log.error("rocketMQEndpoint is empty");
             }
             if (StringUtils.isEmpty(BIZ_TOPIC)) {
-                log.info("bizTopic is empty");
+                log.error("bizTopic is empty");
             }
             if (StringUtils.isEmpty(BIZ_CONSUMER_GROUP)) {
-                log.info("bizConsumerGroup is empty");
+                log.error("bizConsumerGroup is empty");
             }
             throw new RuntimeException("RocketMQA2AServerRoutes check init 
rocketmq param error, init failed!!!");
         }
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
index 821ba11..17baa24 100644
--- a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
+++ b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransport.java
@@ -117,8 +117,8 @@ public class RocketMQTransport implements ClientTransport {
     private final String agentTopic;
     private final String accessKey;
     private final String secretKey;
-    private final String rocketMQEndpoint;
-    private final String rocketNamespace;
+    private final String endpoint;
+    private final String namespace;
     private final String workAgentResponseTopic;
     private final String workAgentResponseGroupID;
     private final List<ClientCallInterceptor> interceptors;
@@ -131,7 +131,7 @@ public class RocketMQTransport implements ClientTransport {
     private LitePushConsumer litePushConsumer;
     private Producer producer;
 
-    public RocketMQTransport(String rocketNamespace, String accessKey, String 
secretKey, String workAgentResponseTopic, String workAgentResponseGroupID,
+    public RocketMQTransport(String namespace, String accessKey, String 
secretKey, String workAgentResponseTopic, String workAgentResponseGroupID,
         List<ClientCallInterceptor> interceptors, String agentUrl, 
A2AHttpClient httpClient, String liteTopic, boolean useDefaultRecoverMode, 
AgentCard agentCard) {
         this.accessKey = accessKey;
         this.secretKey = secretKey;
@@ -150,23 +150,23 @@ public class RocketMQTransport implements ClientTransport 
{
         if (null == rocketAgentCardInfo) {
             throw new RuntimeException("RocketMQTransport rocketAgentCardInfo 
pare error");
         }
-        if (null != rocketNamespace && 
!rocketNamespace.equals(rocketAgentCardInfo.getNamespace())) {
+        if (null != namespace && 
!namespace.equals(rocketAgentCardInfo.getNamespace())) {
             throw new RuntimeException("RocketMQTransport rocketAgentCardInfo 
namespace do not match, please check the config info");
         }
-        this.rocketMQEndpoint = rocketAgentCardInfo.getEndpoint();
+        this.endpoint = rocketAgentCardInfo.getEndpoint();
         this.agentTopic = rocketAgentCardInfo.getTopic();
-        this.rocketNamespace = 
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" : 
rocketAgentCardInfo.getNamespace();
-        
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k -> 
new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
+        this.namespace = 
StringUtils.isEmpty(rocketAgentCardInfo.getNamespace()) ? "" : 
rocketAgentCardInfo.getNamespace();
+        LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k 
-> new HashMap<>()).put(this.liteTopic, useDefaultRecoverMode);
         checkConfigParam();
         initRocketMQProducerAndConsumer();
     }
 
     private void initRocketMQProducerAndConsumer() {
-        if (StringUtils.isEmpty(this.rocketMQEndpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) || 
StringUtils.isEmpty(this.liteTopic)) {
+        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) || 
StringUtils.isEmpty(this.liteTopic)) {
             throw new A2AClientException("RocketMQTransport 
initRocketMQProducerAndConsumer param error");
         }
         try {
-            Map<String, LitePushConsumer> consumerMap = 
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, LitePushConsumer> consumerMap = 
ROCKETMQ_CONSUMER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             if (consumerMap.containsKey(this.workAgentResponseTopic)) {
                 this.litePushConsumer = 
consumerMap.get(this.workAgentResponseTopic);
                 this.litePushConsumer.subscribeLite(this.liteTopic);
@@ -184,14 +184,14 @@ public class RocketMQTransport implements ClientTransport 
{
                     this.litePushConsumer = litePushConsumer;
                 }
             }
-            Map<String, Producer> producerMap = 
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, Producer> producerMap = 
ROCKETMQ_PRODUCER_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             if (!producerMap.containsKey(this.agentTopic)) {
                 this.producer = buildProducer(this.agentTopic);
                 producerMap.put(this.agentTopic, this.producer);
             }
             log.info("RocketMQTransport initRocketMQProducerAndConsumer 
success");
         } catch (Exception e) {
-            log.info("RocketMQTransport initRocketMQProducerAndConsumer error: 
{}", e.getMessage());
+            log.error("RocketMQTransport initRocketMQProducerAndConsumer 
error: {}", e.getMessage());
         }
     }
 
@@ -210,7 +210,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport sendMessage error, 
responseMessageId is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -241,7 +241,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport sendMessageStreaming error, 
responseMessageId is null");
                 return;
             }
-            MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, 
k -> new HashMap<>()).put(responseMessageId, sseEventListener);
+            MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> 
new HashMap<>()).put(responseMessageId, sseEventListener);
             log.info("RocketMQTransport sendMessageStreaming success, 
responseMessageId: {}", responseMessageId);
         } catch (Exception e) {
             throw new A2AClientException("RocketMQTransport Failed to send 
streaming message request: " + e, e);
@@ -258,24 +258,24 @@ public class RocketMQTransport implements ClientTransport 
{
             if (null != request.metadata()) {
                 String responseMessageId = 
(String)request.metadata().get(RocketMQA2AConstant.MESSAGE_RESPONSE_ID);
                 if (!StringUtils.isEmpty(responseMessageId)) {
-                    
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>()).put(responseMessageId, sseEventListener);
+                    
MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new 
HashMap<>()).put(responseMessageId, sseEventListener);
                 }
                 String liteTopic = 
(String)request.metadata().get(RocketMQA2AConstant.LITE_TOPIC);
                 if (null != litePushConsumer && 
!StringUtils.isEmpty(liteTopic)) {
                     litePushConsumer.subscribeLite(liteTopic);
                     log.info("litePushConsumer subscribeLite liteTopic: {}", 
liteTopic);
-                    
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k -> 
new HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
+                    
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new 
HashMap<>()).put(liteTopic, this.useDefaultRecoverMode);
                 }
 
                 String closeLiteTopic = 
(String)request.metadata().get(RocketMQA2AConstant.CLOSE_LITE_TOPIC);
                 if (null != litePushConsumer && 
!StringUtils.isEmpty(closeLiteTopic)) {
                     litePushConsumer.unsubscribeLite(closeLiteTopic);
                     log.info("litePushConsumer unsubscribeLite liteTopic: {}", 
closeLiteTopic);
-                    
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.rocketNamespace, k -> 
new HashMap<>()).remove(closeLiteTopic);
+                    
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.computeIfAbsent(this.namespace, k -> new 
HashMap<>()).remove(closeLiteTopic);
                 }
             }
             if (this.useDefaultRecoverMode) {
-                
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(rocketNamespace, k -> new 
HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER, sseEventListener);
+                RECOVER_MESSAGE_STREAM_RESPONSE_MAP.computeIfAbsent(namespace, 
k -> new HashMap<>()).put(RocketMQA2AConstant.DEFAULT_STREAM_RECOVER, 
sseEventListener);
             }
         } catch (Exception e) {
             throw new A2AClientException("RocketMQTransport failed to 
resubscribe streaming message request: " + e, e);
@@ -296,7 +296,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport getTask error, responseMessageId 
is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -324,7 +324,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport cancelTask error, 
responseMessageId is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -353,7 +353,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport 
setTaskPushNotificationConfiguration error, responseMessageId is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -383,7 +383,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport 
getTaskPushNotificationConfiguration error, responseMessageId is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> completableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, completableFuture);
             String result = completableFuture.get(120, TimeUnit.SECONDS);
@@ -411,7 +411,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport 
listTaskPushNotificationConfigurations error, responseMessageId is null");
                 return null;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             String result = objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -439,7 +439,7 @@ public class RocketMQTransport implements ClientTransport {
                 log.error("RocketMQTransport 
deleteTaskPushNotificationConfigurations error, responseMessageId is null");
                 return;
             }
-            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+            Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
             CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
             completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
             objectCompletableFuture.get(120, TimeUnit.SECONDS);
@@ -472,7 +472,7 @@ public class RocketMQTransport implements ClientTransport {
                     log.error("RocketMQTransport getAgentCard 
responseMessageId is null");
                     return null;
                 }
-                Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.rocketNamespace, k -> new 
HashMap<>());
+                Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.computeIfAbsent(this.namespace, k -> new HashMap<>());
                 CompletableFuture<String> objectCompletableFuture = new 
CompletableFuture<>();
                 completableFutureMap.put(responseMessageId, 
objectCompletableFuture);
                 String result = objectCompletableFuture.get(120, 
TimeUnit.SECONDS);
@@ -503,23 +503,23 @@ public class RocketMQTransport implements ClientTransport 
{
     }
 
     private void checkConfigParam() {
-        if (StringUtils.isEmpty(this.rocketMQEndpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) ||
+        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseTopic) ||
             StringUtils.isEmpty(this.workAgentResponseGroupID) || 
StringUtils.isEmpty(this.liteTopic) || StringUtils.isEmpty(agentTopic)) {
 
-            if (StringUtils.isEmpty(this.rocketMQEndpoint)) {
-                log.info("RocketMQTransport checkConfigParam rocketMQEndpoint 
is empty");
+            if (StringUtils.isEmpty(this.endpoint)) {
+                log.error("RocketMQTransport checkConfigParam endpoint is 
empty");
             }
             if (StringUtils.isEmpty(this.workAgentResponseTopic)) {
-                log.info("RocketMQTransport checkConfigParam 
workAgentResponseTopic is empty");
+                log.error("RocketMQTransport checkConfigParam 
workAgentResponseTopic is empty");
             }
             if (StringUtils.isEmpty(this.workAgentResponseGroupID)) {
-                log.info("RocketMQTransport checkConfigParam 
workAgentResponseGroupID is empty");
+                log.error("RocketMQTransport checkConfigParam 
workAgentResponseGroupID is empty");
             }
             if (StringUtils.isEmpty(this.liteTopic)) {
-                log.info("RocketMQTransport checkConfigParam liteTopic is 
empty");
+                log.error("RocketMQTransport checkConfigParam liteTopic is 
empty");
             }
             if (StringUtils.isEmpty(this.agentTopic)) {
-                log.info("RocketMQTransport checkConfigParam agentTopic is 
empty");
+                log.error("RocketMQTransport checkConfigParam agentTopic is 
empty");
             }
             throw new RuntimeException("RocketMQTransport checkConfigParam 
error, init failed !!!");
         }
@@ -539,15 +539,15 @@ public class RocketMQTransport implements ClientTransport 
{
     }
 
     private LitePushConsumer buildConsumer() throws ClientException {
-        if (StringUtils.isEmpty(this.rocketMQEndpoint) || 
StringUtils.isEmpty(this.workAgentResponseGroupID) || 
StringUtils.isEmpty(this.workAgentResponseTopic)) {
+        if (StringUtils.isEmpty(this.endpoint) || 
StringUtils.isEmpty(this.workAgentResponseGroupID) || 
StringUtils.isEmpty(this.workAgentResponseTopic)) {
             log.error("RocketMQTransport buildConsumer check param error");
             return null;
         }
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
         SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
         ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(this.rocketMQEndpoint)
-            .setNamespace(this.rocketNamespace)
+            .setEndpoints(this.endpoint)
+            .setNamespace(this.namespace)
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         LitePushConsumer litePushConsumer = 
provider.newLitePushConsumerBuilder()
@@ -571,9 +571,9 @@ public class RocketMQTransport implements ClientTransport {
                         return ConsumeResult.SUCCESS;
                     }
                     if (!response.isStream()) {
-                        return dealNonStreamResult(response, 
this.rocketNamespace);
+                        return dealNonStreamResult(response, this.namespace);
                     }
-                    return dealStreamResult(response, this.rocketNamespace, 
liteTopic);
+                    return dealStreamResult(response, this.namespace, 
liteTopic);
                 } catch (Exception e) {
                     log.error("RocketMQTransport litePushConsumer consumer 
error, msgId: {}, error: {}", messageView.getMessageId(), e.getMessage());
                     return ConsumeResult.SUCCESS;
@@ -582,20 +582,20 @@ public class RocketMQTransport implements ClientTransport 
{
         return litePushConsumer;
     }
 
-    private ConsumeResult dealStreamResult(RocketMQResponse response, String 
rocketMQNamespace, String liteTopic) {
+    private ConsumeResult dealStreamResult(RocketMQResponse response, String 
namespace, String liteTopic) {
         if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(liteTopic) || !response.isEnd() && 
StringUtils.isEmpty(response.getResponseBody())) {
             log.error("RocketMQTransport dealStreamResult param is error, 
response: {}, liteTopic: {}", JSON.toJSONString(response), liteTopic);
             return ConsumeResult.SUCCESS;
         }
 
-        Map<String, SSEEventListener> sseEventListenerMap = 
MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
+        Map<String, SSEEventListener> sseEventListenerMap = 
MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
         if (null == sseEventListenerMap) {
             log.error("RocketMQTransport dealStreamResult sseEventListenerMap 
is null");
             return ConsumeResult.SUCCESS;
         }
         SSEEventListener sseEventListener = 
sseEventListenerMap.get(response.getMessageId());
         if (null == sseEventListener) {
-            Map<String, Boolean> booleanMap = 
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(rocketMQNamespace);
+            Map<String, Boolean> booleanMap = 
LITE_TOPIC_USE_DEFAULT_RECOVER_MAP.get(namespace);
             if (null == booleanMap) {
                 log.error("RocketMQTransport dealStreamResult booleanMap is 
null");
                 return ConsumeResult.SUCCESS;
@@ -604,8 +604,8 @@ public class RocketMQTransport implements ClientTransport {
             if (null == useDefaultRecoverModeConsumer || 
!useDefaultRecoverModeConsumer) {
                 return ConsumeResult.SUCCESS;
             }
-            if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() && 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(rocketMQNamespace)) {
-                Map<String, SSEEventListener> sseEventListenerMapRecover = 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(rocketMQNamespace);
+            if (!RECOVER_MESSAGE_STREAM_RESPONSE_MAP.isEmpty() && 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.containsKey(namespace)) {
+                Map<String, SSEEventListener> sseEventListenerMapRecover = 
RECOVER_MESSAGE_STREAM_RESPONSE_MAP.get(namespace);
                 if (null == sseEventListenerMapRecover) {
                     log.error("RocketMQTransport dealStreamResult 
sseEventListenerMapRecover is null");
                     return ConsumeResult.SUCCESS;
@@ -638,12 +638,12 @@ public class RocketMQTransport implements ClientTransport 
{
         return ConsumeResult.SUCCESS;
     }
 
-    private ConsumeResult dealNonStreamResult(RocketMQResponse response, 
String rocketMQNamespace) {
+    private ConsumeResult dealNonStreamResult(RocketMQResponse response, 
String namespace) {
         if (null == response || StringUtils.isEmpty(response.getMessageId()) 
|| StringUtils.isEmpty(response.getResponseBody())) {
             log.error("RocketMQTransport dealNonStreamResult param is error, 
response: {}", JSON.toJSONString(response));
             return ConsumeResult.SUCCESS;
         }
-        Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.get(rocketMQNamespace);
+        Map<String, CompletableFuture<String>> completableFutureMap = 
MESSAGE_RESPONSE_MAP.get(namespace);
         if (null != completableFutureMap && 
completableFutureMap.containsKey(response.getMessageId())) {
             CompletableFuture<String> completableFuture = 
completableFutureMap.get(response.getMessageId());
             completableFuture.complete(response.getResponseBody());
@@ -715,8 +715,8 @@ public class RocketMQTransport implements ClientTransport {
         final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
         SessionCredentialsProvider sessionCredentialsProvider = new 
StaticSessionCredentialsProvider(accessKey, secretKey);
         ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-            .setEndpoints(this.rocketMQEndpoint)
-            .setNamespace(this.rocketNamespace)
+            .setEndpoints(this.endpoint)
+            .setNamespace(this.namespace)
             .setCredentialProvider(sessionCredentialsProvider)
             .setRequestTimeout(Duration.ofSeconds(15))
             .build();
@@ -742,7 +742,7 @@ public class RocketMQTransport implements ClientTransport {
 
     private RocketMQResourceInfo parseAgentCardAddition(AgentCard agentCard) {
         if (null == agentCard || 
StringUtils.isEmpty(agentCard.preferredTransport()) || 
StringUtils.isEmpty(agentCard.url()) || null == 
agentCard.additionalInterfaces() || agentCard.additionalInterfaces().isEmpty()) 
{
-            log.info("parseAgentCardAddition param error, agentCard: {}", 
JSON.toJSONString(agentCard));
+            log.error("parseAgentCardAddition param error, agentCard: {}", 
JSON.toJSONString(agentCard));
             return null;
         }
         RocketMQResourceInfo rocketMQResourceInfo = null;
@@ -750,8 +750,7 @@ public class RocketMQTransport implements ClientTransport {
         if (RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(preferredTransport)) {
             String url = agentCard.url();
             rocketMQResourceInfo = pareAgentCardUrl(url);
-            if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(
-                rocketMQResourceInfo.getTopic())) {
+            if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
                 log.info("RocketMQTransport get rocketMQResourceInfo from 
preferredTransport");
                 return rocketMQResourceInfo;
             }
@@ -762,9 +761,8 @@ public class RocketMQTransport implements ClientTransport {
             if (!StringUtils.isEmpty(transport) && 
RocketMQA2AConstant.ROCKETMQ_PROTOCOL.equals(transport)) {
                 String url = agentInterface.url();
                 rocketMQResourceInfo = pareAgentCardUrl(url);
-                if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(
-                    rocketMQResourceInfo.getTopic())) {
-                    log.info("RocketMQTransport get rocketMQResourceInfo from 
additionalInterfaces");
+                if (null != rocketMQResourceInfo && 
!StringUtils.isEmpty(rocketMQResourceInfo.getEndpoint()) && 
!StringUtils.isEmpty(rocketMQResourceInfo.getTopic())) {
+                    log.error("RocketMQTransport get rocketMQResourceInfo from 
additionalInterfaces");
                     return rocketMQResourceInfo;
                 }
             }
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
index 49757c6..bdae321 100644
--- 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
+++ 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportConfig.java
@@ -22,8 +22,8 @@ import io.a2a.client.transport.spi.ClientTransportConfig;
 public class RocketMQTransportConfig extends 
ClientTransportConfig<RocketMQTransport> {
     private String accessKey;
     private String secretKey;
-    private String globalEndpoint;
-    private String rocketMQNamespace;
+    private String endpoint;
+    private String namespace;
     private String workAgentResponseTopic;
     private String workAgentResponseGroupID;
     private String agentTopic;
@@ -31,24 +31,24 @@ public class RocketMQTransportConfig extends 
ClientTransportConfig<RocketMQTrans
     private String liteTopic;
     private boolean useDefaultRecoverMode = false;
 
-    public RocketMQTransportConfig(String accessKey, String secretKey, String 
globalEndpoint, String rocketMQNamespace,
+    public RocketMQTransportConfig(String accessKey, String secretKey, String 
endpoint, String namespace,
         String workAgentResponseTopic, String workAgentResponseGroupID, String 
agentTopic, A2AHttpClient httpClient) {
         this.accessKey = accessKey;
         this.secretKey = secretKey;
-        this.globalEndpoint = globalEndpoint;
-        this.rocketMQNamespace = rocketMQNamespace;
+        this.endpoint = endpoint;
+        this.namespace = namespace;
         this.workAgentResponseTopic = workAgentResponseTopic;
         this.workAgentResponseGroupID = workAgentResponseGroupID;
         this.agentTopic = agentTopic;
         this.httpClient = httpClient;
     }
 
-    public RocketMQTransportConfig(String accessKey, String secretKey, String 
globalEndpoint, String rocketMQNamespace,
+    public RocketMQTransportConfig(String accessKey, String secretKey, String 
endpoint, String namespace,
         String workAgentResponseTopic, String workAgentResponseGroupID, String 
agentTopic, A2AHttpClient httpClient, String liteTopic, boolean 
useDefaultRecoverMode) {
         this.accessKey = accessKey;
         this.secretKey = secretKey;
-        this.globalEndpoint = globalEndpoint;
-        this.rocketMQNamespace = rocketMQNamespace;
+        this.endpoint = endpoint;
+        this.namespace = namespace;
         this.workAgentResponseTopic = workAgentResponseTopic;
         this.workAgentResponseGroupID = workAgentResponseGroupID;
         this.agentTopic = agentTopic;
@@ -57,11 +57,11 @@ public class RocketMQTransportConfig extends 
ClientTransportConfig<RocketMQTrans
         this.useDefaultRecoverMode = useDefaultRecoverMode;
     }
 
-    public RocketMQTransportConfig(String accessKey, String secretKey, String 
globalEndpoint, String rocketMQNamespace, String agentTopic, A2AHttpClient 
httpClient) {
+    public RocketMQTransportConfig(String accessKey, String secretKey, String 
endpoint, String namespace, String agentTopic, A2AHttpClient httpClient) {
         this.accessKey = accessKey;
         this.secretKey = secretKey;
-        this.globalEndpoint = globalEndpoint;
-        this.rocketMQNamespace = rocketMQNamespace;
+        this.endpoint = endpoint;
+        this.namespace = namespace;
         this.agentTopic = agentTopic;
         this.httpClient = httpClient;
     }
@@ -88,20 +88,20 @@ public class RocketMQTransportConfig extends 
ClientTransportConfig<RocketMQTrans
         this.secretKey = secretKey;
     }
 
-    public String getGlobalEndpoint() {
-        return globalEndpoint;
+    public String getEndpoint() {
+        return endpoint;
     }
 
-    public void setGlobalEndpoint(String globalEndpoint) {
-        this.globalEndpoint = globalEndpoint;
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
     }
 
-    public String getRocketMQNamespace() {
-        return rocketMQNamespace;
+    public String getNamespace() {
+        return namespace;
     }
 
-    public void setRocketMQNamespace(String rocketMQNamespace) {
-        this.rocketMQNamespace = rocketMQNamespace;
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
     }
 
     public String getWorkAgentResponseTopic() {
diff --git 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
index 2b98291..ddf6ca5 100644
--- 
a/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
+++ 
b/src/main/java/org/apache/rocketmq/a2a/transport/RocketMQTransportProvider.java
@@ -30,7 +30,7 @@ public class RocketMQTransportProvider implements 
ClientTransportProvider<Rocket
         if (clientTransportConfig == null) {
             clientTransportConfig = new RocketMQTransportConfig(new 
JdkA2AHttpClient());
         }
-        return new 
RocketMQTransport(clientTransportConfig.getRocketMQNamespace(), 
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(), 
clientTransportConfig.getWorkAgentResponseTopic(), 
clientTransportConfig.getWorkAgentResponseGroupID(), 
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(), 
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(), 
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
+        return new RocketMQTransport(clientTransportConfig.getNamespace(), 
clientTransportConfig.getAccessKey(), clientTransportConfig.getSecretKey(), 
clientTransportConfig.getWorkAgentResponseTopic(), 
clientTransportConfig.getWorkAgentResponseGroupID(), 
clientTransportConfig.getInterceptors(), clientTransportConfig.getAgentUrl(), 
clientTransportConfig.getHttpClient(), clientTransportConfig.getLiteTopic(), 
clientTransportConfig.isUseDefaultRecoverMode(), agentCard);
     }
 
     @Override


Reply via email to