Alonexc commented on code in PR #3691:
URL: https://github.com/apache/eventmesh/pull/3691#discussion_r1162428628


##########
eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java:
##########
@@ -49,120 +51,101 @@ public class EventMeshTestUtils {
 
     private static final String DEFAULT_TTL_MS = "30000";
 
+
+    private static UserAgent getUserAgent(Integer port, String subsystem, 
Integer pid){
+        return UserAgent.builder()
+        .env(UtilsConstants.ENV)
+        .host(UtilsConstants.HOST)
+        .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
+        .username(UtilsConstants.USER_NAME)
+        .group(UtilsConstants.GROUP)
+        .path(UtilsConstants.PATH)
+        .port(port)
+        .subsystem(subsystem)
+        .pid(pid)
+        .version(UtilsConstants.VERSION)
+        .idc(UtilsConstants.IDC)
+        .build();
+    }
+     
+
     // generate pub-client
-    public static UserAgent generateClient1() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_1)
-            .subsystem(UtilsConstants.SUB_SYSTEM_1)
-            .pid(UtilsConstants.PID_1)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        public static UserAgent generateClient1() {
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_1, 
UtilsConstants.SUB_SYSTEM_1, UtilsConstants.PID_1);
         return MessageUtils.generatePubClient(agent);
     }
 
     // generate sub-client
     public static UserAgent generateClient2() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_2)
-            .subsystem(UtilsConstants.SUB_SYSTEM_2)
-            .pid(UtilsConstants.PID_2)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_2, 
UtilsConstants.SUB_SYSTEM_2, UtilsConstants.PID_2);
         return MessageUtils.generateSubClient(agent);
     }
 
-    public static Package syncRR() {
+    private static Package getPackageMsg(Command requestToServer, 
EventMeshMessage eventMeshMessage){
         final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateSyncRRMqMsg());
+        msg.setHeader(new Header(requestToServer, 0, null, 
generateRandomString(SEQ_LENGTH)));
+        msg.setBody(eventMeshMessage);
         return msg;
     }
 
+    public static Package syncRR() {
+        return getPackageMsg(Command.REQUEST_TO_SERVER, generateSyncRRMqMsg());
+    }
+
     public static Package asyncRR() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncRRMqMsg());
-        return msg;
+        return getPackageMsg(Command.REQUEST_TO_SERVER, 
generateAsyncRRMqMsg());
     }
 
     public static Package asyncMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncEventMqMsg());
-        return msg;
+        return getPackageMsg(Command.ASYNC_MESSAGE_TO_SERVER, 
generateAsyncEventMqMsg());
     }
 
     public static Package broadcastMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateBroadcastMqMsg());
-        return msg;
+        return getPackageMsg(Command.BROADCAST_MESSAGE_TO_SERVER, 
generateBroadcastMqMsg());
     }
 
     public static Package rrResponse(final EventMeshMessage request) {
-        final Package msg = new Package();
-        msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(request);
-        return msg;
+        return getPackageMsg(RESPONSE_TO_SERVER, request);
     }
 
+    public static EventMeshMessage getEventMeshMessage(String 
eventMeshTcpSyncTestTopic, String msgType, String msg, String keys, 
+                                                       String keyMsg, String 
testMessage){
+
+        final EventMeshMessage mqmsg = new EventMeshMessage();
+        mqmsg.setTopic(eventMeshTcpSyncTestTopic);
+        mqmsg.getProperties().put(msgType, msg);
+        mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
+        mqmsg.getProperties().put(keys,keyMsg);
+        mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
+        mqmsg.setBody(testMessage);
+        return mqmsg ;
+         
+
+                                                       }
+
     public static EventMeshMessage generateSyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.MSG_TYPE, "persistent");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.KEYS, 
generateRandomString(16));
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testSyncRR");
-        return mqMsg;
+        
+        return EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC 
, UtilsConstants.MSG_TYPE , "persistent",UtilsConstants.KEYS, 
generateRandomString(16),

Review Comment:
   Here it should be the `getEventMeshMessage()` method.



##########
eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java:
##########
@@ -49,120 +51,101 @@ public class EventMeshTestUtils {
 
     private static final String DEFAULT_TTL_MS = "30000";
 
+
+    private static UserAgent getUserAgent(Integer port, String subsystem, 
Integer pid){
+        return UserAgent.builder()
+        .env(UtilsConstants.ENV)
+        .host(UtilsConstants.HOST)
+        .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
+        .username(UtilsConstants.USER_NAME)
+        .group(UtilsConstants.GROUP)
+        .path(UtilsConstants.PATH)
+        .port(port)
+        .subsystem(subsystem)
+        .pid(pid)
+        .version(UtilsConstants.VERSION)
+        .idc(UtilsConstants.IDC)
+        .build();
+    }
+     
+
     // generate pub-client
-    public static UserAgent generateClient1() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_1)
-            .subsystem(UtilsConstants.SUB_SYSTEM_1)
-            .pid(UtilsConstants.PID_1)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        public static UserAgent generateClient1() {
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_1, 
UtilsConstants.SUB_SYSTEM_1, UtilsConstants.PID_1);
         return MessageUtils.generatePubClient(agent);
     }
 
     // generate sub-client
     public static UserAgent generateClient2() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_2)
-            .subsystem(UtilsConstants.SUB_SYSTEM_2)
-            .pid(UtilsConstants.PID_2)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_2, 
UtilsConstants.SUB_SYSTEM_2, UtilsConstants.PID_2);
         return MessageUtils.generateSubClient(agent);
     }
 
-    public static Package syncRR() {
+    private static Package getPackageMsg(Command requestToServer, 
EventMeshMessage eventMeshMessage){
         final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateSyncRRMqMsg());
+        msg.setHeader(new Header(requestToServer, 0, null, 
generateRandomString(SEQ_LENGTH)));
+        msg.setBody(eventMeshMessage);
         return msg;
     }
 
+    public static Package syncRR() {
+        return getPackageMsg(Command.REQUEST_TO_SERVER, generateSyncRRMqMsg());
+    }
+
     public static Package asyncRR() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncRRMqMsg());
-        return msg;
+        return getPackageMsg(Command.REQUEST_TO_SERVER, 
generateAsyncRRMqMsg());
     }
 
     public static Package asyncMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncEventMqMsg());
-        return msg;
+        return getPackageMsg(Command.ASYNC_MESSAGE_TO_SERVER, 
generateAsyncEventMqMsg());
     }
 
     public static Package broadcastMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateBroadcastMqMsg());
-        return msg;
+        return getPackageMsg(Command.BROADCAST_MESSAGE_TO_SERVER, 
generateBroadcastMqMsg());
     }
 
     public static Package rrResponse(final EventMeshMessage request) {
-        final Package msg = new Package();
-        msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(request);
-        return msg;
+        return getPackageMsg(RESPONSE_TO_SERVER, request);
     }
 
+    public static EventMeshMessage getEventMeshMessage(String 
eventMeshTcpSyncTestTopic, String msgType, String msg, String keys, 
+                                                       String keyMsg, String 
testMessage){
+
+        final EventMeshMessage mqmsg = new EventMeshMessage();
+        mqmsg.setTopic(eventMeshTcpSyncTestTopic);
+        mqmsg.getProperties().put(msgType, msg);
+        mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
+        mqmsg.getProperties().put(keys,keyMsg);
+        mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
+        mqmsg.setBody(testMessage);
+        return mqmsg ;
+         
+
+                                                       }
+
     public static EventMeshMessage generateSyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.MSG_TYPE, "persistent");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.KEYS, 
generateRandomString(16));
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testSyncRR");
-        return mqMsg;
+        
+        return EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC 
, UtilsConstants.MSG_TYPE , "persistent",UtilsConstants.KEYS, 
generateRandomString(16),
+        "testSyncRR");
     }
 
 
     private static EventMeshMessage generateAsyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testAsyncRR");
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 

Review Comment:
   Here it should be the `getEventMeshMessage()` method.



##########
eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java:
##########
@@ -49,120 +51,101 @@ public class EventMeshTestUtils {
 
     private static final String DEFAULT_TTL_MS = "30000";
 
+
+    private static UserAgent getUserAgent(Integer port, String subsystem, 
Integer pid){
+        return UserAgent.builder()
+        .env(UtilsConstants.ENV)
+        .host(UtilsConstants.HOST)
+        .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
+        .username(UtilsConstants.USER_NAME)
+        .group(UtilsConstants.GROUP)
+        .path(UtilsConstants.PATH)
+        .port(port)
+        .subsystem(subsystem)
+        .pid(pid)
+        .version(UtilsConstants.VERSION)
+        .idc(UtilsConstants.IDC)
+        .build();
+    }
+     
+
     // generate pub-client
-    public static UserAgent generateClient1() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_1)
-            .subsystem(UtilsConstants.SUB_SYSTEM_1)
-            .pid(UtilsConstants.PID_1)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        public static UserAgent generateClient1() {
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_1, 
UtilsConstants.SUB_SYSTEM_1, UtilsConstants.PID_1);
         return MessageUtils.generatePubClient(agent);
     }
 
     // generate sub-client
     public static UserAgent generateClient2() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_2)
-            .subsystem(UtilsConstants.SUB_SYSTEM_2)
-            .pid(UtilsConstants.PID_2)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_2, 
UtilsConstants.SUB_SYSTEM_2, UtilsConstants.PID_2);
         return MessageUtils.generateSubClient(agent);
     }
 
-    public static Package syncRR() {
+    private static Package getPackageMsg(Command requestToServer, 
EventMeshMessage eventMeshMessage){
         final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateSyncRRMqMsg());
+        msg.setHeader(new Header(requestToServer, 0, null, 
generateRandomString(SEQ_LENGTH)));
+        msg.setBody(eventMeshMessage);
         return msg;
     }
 
+    public static Package syncRR() {
+        return getPackageMsg(Command.REQUEST_TO_SERVER, generateSyncRRMqMsg());
+    }
+
     public static Package asyncRR() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncRRMqMsg());
-        return msg;
+        return getPackageMsg(Command.REQUEST_TO_SERVER, 
generateAsyncRRMqMsg());
     }
 
     public static Package asyncMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncEventMqMsg());
-        return msg;
+        return getPackageMsg(Command.ASYNC_MESSAGE_TO_SERVER, 
generateAsyncEventMqMsg());
     }
 
     public static Package broadcastMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateBroadcastMqMsg());
-        return msg;
+        return getPackageMsg(Command.BROADCAST_MESSAGE_TO_SERVER, 
generateBroadcastMqMsg());
     }
 
     public static Package rrResponse(final EventMeshMessage request) {
-        final Package msg = new Package();
-        msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(request);
-        return msg;
+        return getPackageMsg(RESPONSE_TO_SERVER, request);
     }
 
+    public static EventMeshMessage getEventMeshMessage(String 
eventMeshTcpSyncTestTopic, String msgType, String msg, String keys, 
+                                                       String keyMsg, String 
testMessage){
+
+        final EventMeshMessage mqmsg = new EventMeshMessage();
+        mqmsg.setTopic(eventMeshTcpSyncTestTopic);
+        mqmsg.getProperties().put(msgType, msg);
+        mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
+        mqmsg.getProperties().put(keys,keyMsg);
+        mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
+        mqmsg.setBody(testMessage);
+        return mqmsg ;
+         
+
+                                                       }
+
     public static EventMeshMessage generateSyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.MSG_TYPE, "persistent");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.KEYS, 
generateRandomString(16));
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testSyncRR");
-        return mqMsg;
+        
+        return EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC 
, UtilsConstants.MSG_TYPE , "persistent",UtilsConstants.KEYS, 
generateRandomString(16),
+        "testSyncRR");
     }
 
 
     private static EventMeshMessage generateAsyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testAsyncRR");
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 
+        UtilsConstants.PROPERTY_MESSAGE_REPLY_TO ,"notnull",
+        "testAsyncRR");
     }
 
     public static EventMeshMessage generateAsyncEventMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody(ASYNC_MSG_BODY);
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 

Review Comment:
   Here it should be the `getEventMeshMessage()` method.



##########
eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java:
##########
@@ -49,120 +51,101 @@ public class EventMeshTestUtils {
 
     private static final String DEFAULT_TTL_MS = "30000";
 
+
+    private static UserAgent getUserAgent(Integer port, String subsystem, 
Integer pid){
+        return UserAgent.builder()
+        .env(UtilsConstants.ENV)
+        .host(UtilsConstants.HOST)
+        .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
+        .username(UtilsConstants.USER_NAME)
+        .group(UtilsConstants.GROUP)
+        .path(UtilsConstants.PATH)
+        .port(port)
+        .subsystem(subsystem)
+        .pid(pid)
+        .version(UtilsConstants.VERSION)
+        .idc(UtilsConstants.IDC)
+        .build();
+    }
+     
+
     // generate pub-client
-    public static UserAgent generateClient1() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_1)
-            .subsystem(UtilsConstants.SUB_SYSTEM_1)
-            .pid(UtilsConstants.PID_1)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        public static UserAgent generateClient1() {
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_1, 
UtilsConstants.SUB_SYSTEM_1, UtilsConstants.PID_1);
         return MessageUtils.generatePubClient(agent);
     }
 
     // generate sub-client
     public static UserAgent generateClient2() {
-        final UserAgent agent = UserAgent.builder()
-            .env(UtilsConstants.ENV)
-            .host(UtilsConstants.HOST)
-            .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
-            .username(UtilsConstants.USER_NAME)
-            .group(UtilsConstants.GROUP)
-            .path(UtilsConstants.PATH)
-            .port(UtilsConstants.PORT_2)
-            .subsystem(UtilsConstants.SUB_SYSTEM_2)
-            .pid(UtilsConstants.PID_2)
-            .version(UtilsConstants.VERSION)
-            .idc(UtilsConstants.IDC)
-            .build();
+        final UserAgent agent = getUserAgent(UtilsConstants.PORT_2, 
UtilsConstants.SUB_SYSTEM_2, UtilsConstants.PID_2);
         return MessageUtils.generateSubClient(agent);
     }
 
-    public static Package syncRR() {
+    private static Package getPackageMsg(Command requestToServer, 
EventMeshMessage eventMeshMessage){
         final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateSyncRRMqMsg());
+        msg.setHeader(new Header(requestToServer, 0, null, 
generateRandomString(SEQ_LENGTH)));
+        msg.setBody(eventMeshMessage);
         return msg;
     }
 
+    public static Package syncRR() {
+        return getPackageMsg(Command.REQUEST_TO_SERVER, generateSyncRRMqMsg());
+    }
+
     public static Package asyncRR() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.REQUEST_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncRRMqMsg());
-        return msg;
+        return getPackageMsg(Command.REQUEST_TO_SERVER, 
generateAsyncRRMqMsg());
     }
 
     public static Package asyncMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateAsyncEventMqMsg());
-        return msg;
+        return getPackageMsg(Command.ASYNC_MESSAGE_TO_SERVER, 
generateAsyncEventMqMsg());
     }
 
     public static Package broadcastMessage() {
-        final Package msg = new Package();
-        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(generateBroadcastMqMsg());
-        return msg;
+        return getPackageMsg(Command.BROADCAST_MESSAGE_TO_SERVER, 
generateBroadcastMqMsg());
     }
 
     public static Package rrResponse(final EventMeshMessage request) {
-        final Package msg = new Package();
-        msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, 
generateRandomString(SEQ_LENGTH)));
-        msg.setBody(request);
-        return msg;
+        return getPackageMsg(RESPONSE_TO_SERVER, request);
     }
 
+    public static EventMeshMessage getEventMeshMessage(String 
eventMeshTcpSyncTestTopic, String msgType, String msg, String keys, 
+                                                       String keyMsg, String 
testMessage){
+
+        final EventMeshMessage mqmsg = new EventMeshMessage();
+        mqmsg.setTopic(eventMeshTcpSyncTestTopic);
+        mqmsg.getProperties().put(msgType, msg);
+        mqmsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
+        mqmsg.getProperties().put(keys,keyMsg);
+        mqmsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
+        mqmsg.setBody(testMessage);
+        return mqmsg ;
+         
+
+                                                       }
+
     public static EventMeshMessage generateSyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.MSG_TYPE, "persistent");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.KEYS, 
generateRandomString(16));
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testSyncRR");
-        return mqMsg;
+        
+        return EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC 
, UtilsConstants.MSG_TYPE , "persistent",UtilsConstants.KEYS, 
generateRandomString(16),
+        "testSyncRR");
     }
 
 
     private static EventMeshMessage generateAsyncRRMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody("testAsyncRR");
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_SYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 
+        UtilsConstants.PROPERTY_MESSAGE_REPLY_TO ,"notnull",
+        "testAsyncRR");
     }
 
     public static EventMeshMessage generateAsyncEventMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody(ASYNC_MSG_BODY);
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 
+        UtilsConstants.PROPERTY_MESSAGE_REPLY_TO ,"notnull",
+        ASYNC_MSG_BODY);
     }
 
     public static EventMeshMessage generateBroadcastMqMsg() {
-        final EventMeshMessage mqMsg = new EventMeshMessage();
-        mqMsg.setTopic(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC);
-        mqMsg.getProperties().put(UtilsConstants.REPLY_TO, 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI");
-        mqMsg.getProperties().put(UtilsConstants.TTL, DEFAULT_TTL_MS);
-        mqMsg.getProperties().put(UtilsConstants.PROPERTY_MESSAGE_REPLY_TO, 
"notnull");
-        mqMsg.getHeaders().put(Constants.DATA_CONTENT_TYPE, "text/plain");
-        mqMsg.setBody(ASYNC_MSG_BODY);
-        return mqMsg;
+        return 
EventMeshMessage(ExampleConstants.EVENTMESH_TCP_ASYNC_TEST_TOPIC, 
UtilsConstants.REPLY_TO , 
"localhost@ProducerGroup-producerPool-9-access#V1_4_0#CI" , 

Review Comment:
   Here it should be the `getEventMeshMessage()` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to