This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch cloudevents
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/cloudevents by this push:
     new 0036d0a  Change TCP Decoder and Encoder (#621)
0036d0a is described below

commit 0036d0a04063ce04a90b7b921061e89b2e7805ec
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 1 10:13:47 2021 +0800

    Change TCP Decoder and Encoder (#621)
---
 .../common/protocol/tcp/EventMeshMessage.java      |  55 +-----
 .../eventmesh/common/protocol/tcp/Header.java      |  25 +--
 .../eventmesh/common/protocol/tcp/Package.java     |  11 +-
 .../eventmesh/common/protocol/tcp/codec/Codec.java | 210 +++++++++++++--------
 .../eventmesh/tcp/common/EventMeshTestUtils.java   |   5 +-
 .../tcp/demo/pub/cloudevents/AsyncPublish.java     |   1 -
 .../demo/pub/eventmeshmessage/AsyncPublish.java    |   1 -
 .../eventmeshmessage/AsyncPublishBroadcast.java    |   1 -
 .../tcp/demo/pub/eventmeshmessage/SyncRequest.java |   1 -
 .../tcp/demo/sub/cloudevents/AsyncSubscribe.java   |  13 +-
 .../demo/sub/eventmeshmessage/AsyncSubscribe.java  |  21 +--
 .../eventmeshmessage/AsyncSubscribeBroadcast.java  |  10 +-
 .../demo/sub/eventmeshmessage/SyncResponse.java    |   7 +-
 .../cloudevents/CloudEventsProtocolAdaptor.java    |  11 +-
 .../resolver/tcp/TcpMessageProtocolResolver.java   |   8 +-
 .../meshmessage/MeshMessageProtocolAdaptor.java    |  13 +-
 .../resolver/tcp/TcpMessageProtocolResolver.java   |   6 +-
 .../eventmesh/runtime/boot/EventMeshServer.java    |  15 +-
 .../client/group/ClientSessionGroupMapping.java    |   4 +-
 .../eventmesh/client/tcp/EventMeshTCPClient.java   |   2 -
 .../client/tcp/EventMeshTCPPubClient.java          |   2 -
 .../client/tcp/EventMeshTCPSubClient.java          |   2 -
 .../client/tcp/common/ReceiveMsgHook.java          |  17 +-
 .../eventmesh/client/tcp/common/TcpClient.java     |  25 +++
 .../tcp/impl/cloudevent/CloudEventTCPClient.java   |   6 -
 .../impl/cloudevent/CloudEventTCPPubClient.java    |  37 ++--
 .../impl/cloudevent/CloudEventTCPSubClient.java    |  69 +++----
 .../EventMeshMessageTCPClient.java                 |   8 +-
 .../EventMeshMessageTCPPubClient.java              |  24 +--
 .../EventMeshMessageTCPSubClient.java              |  56 +++---
 .../tcp/impl/openmessage/OpenMessageTCPClient.java |   6 -
 .../impl/openmessage/OpenMessageTCPPubClient.java  |   5 -
 .../impl/openmessage/OpenMessageTCPSubClient.java  |   5 -
 33 files changed, 315 insertions(+), 367 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
index 01d744e..a51f9e0 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
@@ -20,51 +20,16 @@ package org.apache.eventmesh.common.protocol.tcp;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class EventMeshMessage {
-
-    private String topic;
-    Map<String, String> properties = new ConcurrentHashMap<>();
-    private String body;
-
-    public EventMeshMessage() {
-    }
-
-    public EventMeshMessage(String topic, Map<String, String> properties, 
String body) {
-        this.topic = topic;
-        this.properties = properties;
-        this.body = body;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public Map<String, String> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Map<String, String> properties) {
-        this.properties = properties;
-    }
-
-    public String getBody() {
-        return body;
-    }
-
-    public void setBody(String body) {
-        this.body = body;
-    }
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class EventMeshMessage {
 
-    @Override
-    public String toString() {
-        return "EventMeshMessage{" +
-                "topic='" + topic + '\'' +
-                ", properties=" + properties +
-                ", body='" + body + '\'' +
-                '}';
-    }
+    private String              topic;
+    private Map<String, String> properties = new ConcurrentHashMap<>();
+    private String              body;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
index d3d5cc6..da30aea 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
@@ -20,13 +20,16 @@ package org.apache.eventmesh.common.protocol.tcp;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.Data;
+
+@Data
 public class Header {
 
-    private Command cmd;
-    private int code;
-    private String desc;
-    private String seq;
-    private Map<String, Object> properties;
+    private Command             cmd;
+    private int                 code;
+    private String              desc;
+    private String              seq;
+    private Map<String, Object> properties = new HashMap<>();
 
     public Header() {
     }
@@ -45,6 +48,7 @@ public class Header {
         this.properties = properties;
     }
 
+
     public Command getCommand() {
         return cmd;
     }
@@ -97,18 +101,7 @@ public class Header {
         if (null == this.properties) {
             this.properties = new HashMap<>();
         }
-
         return this.properties.get(name);
     }
 
-    @Override
-    public String toString() {
-        return "Header{" +
-            "cmd=" + cmd +
-            ", code=" + code +
-            ", desc='" + desc + '\'' +
-            ", seq='" + seq + '\'' +
-            ", properties=" + properties +
-            '}';
-    }
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
index f277d06..1a2a223 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
@@ -19,23 +19,20 @@ package org.apache.eventmesh.common.protocol.tcp;
 
 import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 @Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class Package implements ProtocolTransportObject {
 
     private Header header;
     private Object body;
 
-    public Package() {
-    }
-
     public Package(Header header) {
         this.header = header;
     }
 
-    public Package(Header header, Object body) {
-        this.header = header;
-        this.body = body;
-    }
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index d520e04..191a099 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -17,12 +17,23 @@
 
 package org.apache.eventmesh.common.protocol.tcp.codec;
 
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
+import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+
+import org.apache.commons.lang3.ArrayUtils;
+
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
@@ -31,53 +42,48 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import io.netty.handler.codec.ReplayingDecoder;
+import lombok.extern.slf4j.Slf4j;
 
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
-import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+@Slf4j
 public class Codec {
 
-    private final static Logger logger = LoggerFactory.getLogger(Codec.class);
-    private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;
-    private static Charset UTF8 = Charset.forName("UTF-8");
+    private static final int     FRAME_MAX_LENGTH = 1024 * 1024 * 4;
+    private static final Charset DEFAULT_CHARSET  = 
Charset.forName(Constants.DEFAULT_CHARSET);
 
-    private static final byte[] CONSTANT_MAGIC_FLAG = 
"EventMesh".getBytes(UTF8);
+    private static final byte[] CONSTANT_MAGIC_FLAG = 
serializeBytes("EventMesh");
+    private static final byte[] VERSION             = serializeBytes("0000");
 
-    private static final byte[] VERSION = "0000".getBytes(UTF8);
+    // todo: move to constants
+    public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
+    public static String EM_MESSAGE_PROTOCOL_NAME   = "eventmeshmessage";
+    public static String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage";
 
-    private static ObjectMapper jsonMapper;
+    // todo: use json util
+    private static ObjectMapper OBJECT_MAPPER;
 
     static {
-        jsonMapper = new ObjectMapper();
-        jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        jsonMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-        jsonMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-        jsonMapper.setTimeZone(TimeZone.getDefault());
+        OBJECT_MAPPER = new ObjectMapper();
+        OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+        
OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+        OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+        OBJECT_MAPPER.setTimeZone(TimeZone.getDefault());
     }
 
     public static class Encoder extends MessageToByteEncoder<Package> {
         @Override
         public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf 
out) throws Exception {
-            byte[] headerData;
-            byte[] bodyData;
+            final String headerJson = pkg != null ? 
OBJECT_MAPPER.writeValueAsString(pkg.getHeader()) : null;
+            final String bodyJson = pkg != null ? 
OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;
 
-            final String headerJson = pkg != null ? 
jsonMapper.writeValueAsString(pkg.getHeader()) : null;
-            final String bodyJson = pkg != null ? 
jsonMapper.writeValueAsString(pkg.getBody()) : null;
+            final byte[] headerData = serializeBytes(headerJson);
+            final byte[] bodyData = serializeBytes(bodyJson);
 
-            headerData = headerJson == null ? null : headerJson.getBytes(UTF8);
-            bodyData = bodyJson == null ? null : bodyJson.getBytes(UTF8);
-
-            logger.debug("headerJson={}|bodyJson={}", headerJson, bodyJson);
+            if (log.isDebugEnabled()) {
+                log.debug("Encoder headerJson={}|bodyJson={}", headerJson, 
bodyJson);
+            }
 
-            int headerLength = headerData == null ? 0 : headerData.length;
-            int bodyLength = bodyData == null ? 0 : bodyData.length;
+            int headerLength = ArrayUtils.getLength(headerData);
+            int bodyLength = ArrayUtils.getLength(bodyData);
 
             int length = 4 + 4 + headerLength + bodyLength;
 
@@ -89,94 +95,140 @@ public class Codec {
             out.writeBytes(VERSION);
             out.writeInt(length);
             out.writeInt(headerLength);
-            if (headerData != null)
+            if (headerData != null) {
                 out.writeBytes(headerData);
-            if (bodyData != null)
+            }
+            if (bodyData != null) {
                 out.writeBytes(bodyData);
+            }
         }
     }
 
-    public static class Decoder extends ReplayingDecoder {
+    public static class Decoder extends ReplayingDecoder<Package> {
         @Override
         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception {
-            Header header = null;
-            Object body = null;
-
-            int length = 0;
-            int headerLength = 0;
-            int bodyLength = 0;
-
             try {
-                if (null == in)
+                if (null == in) {
                     return;
-
-                byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length];
-                byte[] versionBytes = new byte[VERSION.length];
-
-                in.readBytes(flagBytes);
-                in.readBytes(versionBytes);
-                if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || 
!Arrays.equals(versionBytes, VERSION)) {
-                    String errorMsg = String.format("invalid magic flag or " +
-                            "version|flag=%s|version=%s|remoteAddress=%s", new 
String(flagBytes, UTF8), new String
-                            (versionBytes, UTF8), 
ctx.channel().remoteAddress());
-                    throw new Exception(errorMsg);
                 }
 
-                length = in.readInt();
-                headerLength = in.readInt();
-                bodyLength = length - 8 - headerLength;
-                byte[] headerData = new byte[headerLength];
-                byte[] bodyData = new byte[bodyLength];
+                byte[] flagBytes = parseFlag(in);
+                byte[] versionBytes = parseVersion(in);
+                validateFlag(flagBytes, versionBytes, ctx);
 
-                if (headerLength > 0) {
-                    in.readBytes(headerData);
-                    header = jsonMapper.readValue(new String(headerData, 
UTF8), Header.class);
-                }
-
-                if (bodyLength > 0 && header != null) {
-                    in.readBytes(bodyData);
-                    body = parseFromJson(header.getCommand(), new 
String(bodyData, UTF8));
-                }
-
-                logger.debug("headerJson={}|bodyJson={}", new 
String(headerData, UTF8), new String(bodyData, UTF8));
+                final int length = in.readInt();
+                final int headerLength = in.readInt();
+                final int bodyLength = length - 8 - headerLength;
+                Header header = parseHeader(in, headerLength);
+                Object body = parseBody(in, header, bodyLength);
 
                 Package pkg = new Package(header, body);
                 out.add(pkg);
             } catch (Exception e) {
-                
logger.error("decode|length={}|headerLength={}|bodyLength={}|header={}|body={}.",
 length,
-                        headerLength, bodyLength, header, body);
+                log.error("decode error| receive: {}.", 
deserializeBytes(in.array()));
                 throw e;
             }
         }
+
+        private byte[] parseFlag(ByteBuf in) {
+            final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length];
+            in.readBytes(flagBytes);
+            return flagBytes;
+        }
+
+        private byte[] parseVersion(ByteBuf in) {
+            final byte[] versionBytes = new byte[VERSION.length];
+            in.readBytes(versionBytes);
+            return versionBytes;
+        }
+
+        private Header parseHeader(ByteBuf in, int headerLength) throws 
JsonProcessingException {
+            if (headerLength <= 0) {
+                return null;
+            }
+            final byte[] headerData = new byte[headerLength];
+            in.readBytes(headerData);
+            if (log.isDebugEnabled()) {
+                log.debug("Decode headerJson={}", 
deserializeBytes(headerData));
+            }
+            return OBJECT_MAPPER.readValue(deserializeBytes(headerData), 
Header.class);
+        }
+
+        private Object parseBody(ByteBuf in, Header header, int bodyLength) 
throws JsonProcessingException {
+            if (bodyLength <= 0 || header == null) {
+                return null;
+            }
+            final byte[] bodyData = new byte[bodyLength];
+            in.readBytes(bodyData);
+            if (log.isDebugEnabled()) {
+                log.debug("Decode bodyJson={}", deserializeBytes(bodyData));
+            }
+            return deserializeBody(deserializeBytes(bodyData), header);
+        }
+
+        private void validateFlag(byte[] flagBytes, byte[] versionBytes, 
ChannelHandlerContext ctx) {
+            if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) || 
!Arrays.equals(versionBytes, VERSION)) {
+                String errorMsg = String.format(
+                    "invalid magic flag or 
version|flag=%s|version=%s|remoteAddress=%s",
+                    deserializeBytes(flagBytes), 
deserializeBytes(versionBytes), ctx.channel().remoteAddress());
+                throw new IllegalArgumentException(errorMsg);
+            }
+        }
     }
 
-    private static Object parseFromJson(Command cmd, String data) throws 
Exception {
-        switch (cmd) {
+    private static Object deserializeBody(String bodyJsonString, Header 
header) throws JsonProcessingException {
+        Command command = header.getCommand();
+        switch (command) {
             case HELLO_REQUEST:
             case RECOMMEND_REQUEST:
-                return jsonMapper.readValue(data, UserAgent.class);
+                return OBJECT_MAPPER.readValue(bodyJsonString, 
UserAgent.class);
             case SUBSCRIBE_REQUEST:
             case UNSUBSCRIBE_REQUEST:
-                return jsonMapper.readValue(data, Subscription.class);
+                return OBJECT_MAPPER.readValue(bodyJsonString, 
Subscription.class);
             case REQUEST_TO_SERVER:
             case RESPONSE_TO_SERVER:
             case ASYNC_MESSAGE_TO_SERVER:
             case BROADCAST_MESSAGE_TO_SERVER:
-                return jsonMapper.readValue(data, EventMeshMessage.class);
             case REQUEST_TO_CLIENT:
             case RESPONSE_TO_CLIENT:
             case ASYNC_MESSAGE_TO_CLIENT:
             case BROADCAST_MESSAGE_TO_CLIENT:
-                return jsonMapper.readValue(data, EventMeshMessage.class);
             case REQUEST_TO_CLIENT_ACK:
             case RESPONSE_TO_CLIENT_ACK:
             case ASYNC_MESSAGE_TO_CLIENT_ACK:
             case BROADCAST_MESSAGE_TO_CLIENT_ACK:
-                return jsonMapper.readValue(data, EventMeshMessage.class);
+                // The message json will be deserialized by protocol plugin
+                return bodyJsonString;
             case REDIRECT_TO_CLIENT:
-                return jsonMapper.readValue(data, RedirectInfo.class);
+                return OBJECT_MAPPER.readValue(bodyJsonString, 
RedirectInfo.class);
             default:
+                log.error("Invalidate TCP command: {}", command);
                 return null;
         }
     }
+
+    /**
+     * Deserialize bytes to String.
+     *
+     * @param bytes
+     * @return
+     */
+    private static String deserializeBytes(byte[] bytes) {
+        return new String(bytes, DEFAULT_CHARSET);
+    }
+
+    /**
+     * Serialize String to bytes.
+     *
+     * @param str
+     * @return
+     */
+    private static byte[] serializeBytes(String str) {
+        if (str == null) {
+            return null;
+        }
+        return str.getBytes(DEFAULT_CHARSET);
+    }
+
+
 }
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 8c3a06f..4359832 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -107,10 +108,10 @@ public class EventMeshTestUtils {
         return msg;
     }
 
-    public static Package rrResponse(Package request) {
+    public static Package rrResponse(EventMeshMessage request) {
         Package msg = new Package();
         msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null, 
generateRandomString(seqLength)));
-        msg.setBody(request.getBody());
+        msg.setBody(request);
         return msg;
     }
 
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
index f279308..6864bf1 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
@@ -55,7 +55,6 @@ public class AsyncPublish {
             client =
                 
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
CloudEvent.class);
             client.init();
-            client.heartbeat();
 
             for (int i = 0; i < 5; i++) {
                 CloudEvent event = EventMeshTestUtils.generateCloudEventV1();
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
index bc39352..7cf1a64 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
@@ -53,7 +53,6 @@ public class AsyncPublish {
             client =
                 
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
EventMeshMessage.class);
             client.init();
-            client.heartbeat();
 
             for (int i = 0; i < 5; i++) {
                 EventMeshMessage eventMeshMessage = 
EventMeshTestUtils.generateAsyncEventMqMsg();
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
index 07c4de5..642a2c8 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
@@ -48,7 +48,6 @@ public class AsyncPublishBroadcast {
         try (final EventMeshTCPClient<EventMeshMessage> client =
                  
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
EventMeshMessage.class)) {
             client.init();
-            client.heartbeat();
 
             EventMeshMessage eventMeshMessage = 
EventMeshTestUtils.generateBroadcastMqMsg();
             logger.info("begin send broadcast msg============={}", 
eventMeshMessage);
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
index f13ee46..30f2ac1 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
@@ -41,7 +41,6 @@ public class SyncRequest {
         try (EventMeshTCPClient<EventMeshMessage> client = 
EventMeshTCPClientFactory.createEventMeshTCPClient(
             eventMeshTcpClientConfig, EventMeshMessage.class)) {
             client.init();
-            client.heartbeat();
 
             EventMeshMessage eventMeshMessage = 
EventMeshTestUtils.generateSyncRRMqMsg();
             log.info("begin send rr msg=================={}", 
eventMeshMessage);
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
index b94d12f..b41a65c 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
@@ -42,7 +42,7 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<CloudEvent> {
 
     public static AsyncSubscribe handler = new AsyncSubscribe();
 
-    private static EventMeshTCPClient client;
+    private static EventMeshTCPClient<CloudEvent> client;
 
     public static void main(String[] agrs) throws Exception {
         Properties properties = 
Utils.readPropertiesFile("application.properties");
@@ -57,7 +57,6 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<CloudEvent> {
         try {
             client = 
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
CloudEvent.class);
             client.init();
-            client.heartbeat();
 
             client.subscribe("TEST-TOPIC-TCP-ASYNC", 
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
@@ -75,13 +74,7 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<CloudEvent> {
     }
 
     @Override
-    public void handle(Package msg, ChannelHandlerContext ctx) {
-        CloudEvent event = convertToProtocolMessage(msg);
-        log.info("receive async msg====================={}", event);
-    }
-
-    @Override
-    public CloudEvent convertToProtocolMessage(Package pkg) {
-        return CloudEventBuilder.from((CloudEvent) pkg.getBody()).build();
+    public void handle(CloudEvent msg, ChannelHandlerContext ctx) {
+        log.info("receive async msg====================={}", msg);
     }
 }
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
index 9df4548..dcc9082 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
@@ -21,12 +21,12 @@ import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
 import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
 import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
-import 
org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
 import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
 import org.apache.eventmesh.util.Utils;
 
@@ -40,7 +40,7 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<EventMeshMessage> {
 
     public static AsyncSubscribe handler = new AsyncSubscribe();
 
-    private static EventMeshTCPClient client;
+    private static EventMeshTCPClient<EventMeshMessage> client;
 
     public static void main(String[] agrs) throws Exception {
         Properties properties = 
Utils.readPropertiesFile("application.properties");
@@ -53,11 +53,12 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<EventMeshMessage> {
             .userAgent(userAgent)
             .build();
         try {
-            client = 
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
EventMeshMessage.class);
+            client =
+                
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, 
EventMeshMessage.class);
             client.init();
-            client.heartbeat();
 
-            client.subscribe("TEST-TOPIC-TCP-ASYNC", 
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+            
client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast, 
SubscriptionMode.CLUSTERING,
+                SubscriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
 
             client.listen();
@@ -73,13 +74,7 @@ public class AsyncSubscribe implements 
ReceiveMsgHook<EventMeshMessage> {
     }
 
     @Override
-    public void handle(Package msg, ChannelHandlerContext ctx) {
-        EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg);
-        log.info("receive async msg====================={}", eventMeshMessage);
-    }
-
-    @Override
-    public EventMeshMessage convertToProtocolMessage(Package pkg) {
-        return (EventMeshMessage) pkg.getBody();
+    public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
+        log.info("receive async msg====================={}", msg);
     }
 }
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
index 31355e0..09890a3 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
@@ -52,7 +52,6 @@ public class AsyncSubscribeBroadcast implements 
ReceiveMsgHook<EventMeshMessage>
         try (EventMeshTCPClient<EventMeshMessage> client = 
EventMeshTCPClientFactory.createEventMeshTCPClient(
             eventMeshTcpClientConfig, EventMeshMessage.class)) {
             client.init();
-            client.heartbeat();
 
             client.subscribe("TEST-TOPIC-TCP-BROADCAST", 
SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
             client.registerSubBusiHandler(handler);
@@ -65,13 +64,8 @@ public class AsyncSubscribeBroadcast implements 
ReceiveMsgHook<EventMeshMessage>
     }
 
     @Override
-    public void handle(Package msg, ChannelHandlerContext ctx) {
-        EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg);
-        log.info("receive broadcast msg==============={}", eventMeshMessage);
+    public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
+        log.info("receive broadcast msg==============={}", msg);
     }
 
-    @Override
-    public EventMeshMessage convertToProtocolMessage(Package pkg) {
-        return (EventMeshMessage) pkg.getBody();
-    }
 }
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
index 5ed9dae..1c5effd 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
@@ -46,7 +46,6 @@ public class SyncResponse implements 
ReceiveMsgHook<EventMeshMessage> {
         try (EventMeshTCPClient<EventMeshMessage> client = 
EventMeshTCPClientFactory
             .createEventMeshTCPClient(eventMeshTcpClientConfig, 
EventMeshMessage.class)) {
             client.init();
-            client.heartbeat();
 
             client.subscribe("TEST-TOPIC-TCP-SYNC", 
SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
             // Synchronize RR messages
@@ -60,14 +59,10 @@ public class SyncResponse implements 
ReceiveMsgHook<EventMeshMessage> {
     }
 
     @Override
-    public void handle(Package msg, ChannelHandlerContext ctx) {
+    public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
         log.info("receive sync rr msg================{}", msg);
         Package pkg = EventMeshTestUtils.rrResponse(msg);
         ctx.writeAndFlush(pkg);
     }
 
-    @Override
-    public EventMeshMessage convertToProtocolMessage(Package pkg) {
-        return null;
-    }
 }
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
index 2ca1e08..b670ef9 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
@@ -50,10 +50,11 @@ public class CloudEventsProtocolAdaptor<T extends 
ProtocolTransportObject>
     public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws 
ProtocolHandleException {
 
         if (cloudEvent instanceof Package) {
-            Header header = ((Package) cloudEvent).getHeader();
-            Object body = ((Package) cloudEvent).getBody();
+            Package tcpPackage = (Package) cloudEvent;
+            Header header = tcpPackage.getHeader();
+            String cloudEventJson = tcpPackage.getBody().toString();
 
-            return deserializeTcpProtocol(header, body);
+            return deserializeTcpProtocol(header, cloudEventJson);
 
         } else if (cloudEvent instanceof HttpCommand) {
             org.apache.eventmesh.common.protocol.http.header.Header header = 
((HttpCommand) cloudEvent).getHeader();
@@ -66,8 +67,8 @@ public class CloudEventsProtocolAdaptor<T extends 
ProtocolTransportObject>
         }
     }
 
-    private CloudEvent deserializeTcpProtocol(Header header, Object body) 
throws ProtocolHandleException {
-        return TcpMessageProtocolResolver.buildEvent(header, body);
+    private CloudEvent deserializeTcpProtocol(Header header, String 
cloudEventJson) throws ProtocolHandleException {
+        return TcpMessageProtocolResolver.buildEvent(header, cloudEventJson);
     }
 
     private CloudEvent deserializeHttpProtocol(String requestCode, 
org.apache.eventmesh.common.protocol.http.header.Header header, Body body) 
throws ProtocolHandleException {
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
index e5fa7f2..0e2b303 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
@@ -11,7 +11,7 @@ import 
org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;
 
 public class TcpMessageProtocolResolver {
 
-    public static CloudEvent buildEvent(Header header, Object body) throws 
ProtocolHandleException {
+    public static CloudEvent buildEvent(Header header, String cloudEventJson) 
throws ProtocolHandleException {
         CloudEventBuilder cloudEventBuilder;
 
         String protocolType = 
header.getProperty(Constants.PROTOCOL_TYPE).toString();
@@ -29,7 +29,8 @@ public class TcpMessageProtocolResolver {
             throw new ProtocolHandleException(String.format("Unsupported 
protocolType: %s", protocolType));
         }
         if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
-            cloudEventBuilder = CloudEventBuilder.v1((CloudEvent) body);
+            // todo: transform cloudEventJson to cloudEvent
+            cloudEventBuilder = CloudEventBuilder.v1(null);
 
             for (String propKey : header.getProperties().keySet()) {
                 cloudEventBuilder.withExtension(propKey, 
header.getProperty(propKey).toString());
@@ -38,7 +39,8 @@ public class TcpMessageProtocolResolver {
             return cloudEventBuilder.build();
 
         } else if (StringUtils.equals(SpecVersion.V03.toString(), 
protocolVersion)) {
-            cloudEventBuilder = CloudEventBuilder.v03((CloudEvent) body);
+            // todo: transform cloudEventJson to cloudEvent
+            cloudEventBuilder = CloudEventBuilder.v03(null);
 
             for (String propKey : header.getProperties().keySet()) {
                 cloudEventBuilder.withExtension(propKey, 
header.getProperty(propKey).toString());
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
index 9dac295..75a99fd 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
@@ -22,8 +22,10 @@ import 
org.apache.eventmesh.common.protocol.ProtocolTransportObject;
 import org.apache.eventmesh.common.protocol.http.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.Body;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
 import 
org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver;
@@ -45,10 +47,11 @@ public class MeshMessageProtocolAdaptor implements 
ProtocolAdaptor<ProtocolTrans
     @Override
     public CloudEvent toCloudEvent(ProtocolTransportObject protocol) throws 
ProtocolHandleException {
         if (protocol instanceof Package) {
-            Header header = ((Package) protocol).getHeader();
-            Object body = ((Package) protocol).getBody();
+            Package tcpPackage = (Package) protocol;
+            Header header = tcpPackage.getHeader();
+            String bodyJson = (String) tcpPackage.getBody();
 
-            return deserializeTcpProtocol(header, body);
+            return deserializeTcpProtocol(header, bodyJson);
 
         } else if (protocol instanceof HttpCommand) {
             org.apache.eventmesh.common.protocol.http.header.Header header = 
((HttpCommand) protocol).getHeader();
@@ -61,8 +64,8 @@ public class MeshMessageProtocolAdaptor implements 
ProtocolAdaptor<ProtocolTrans
         }
     }
 
-    private CloudEvent deserializeTcpProtocol(Header header, Object body) 
throws ProtocolHandleException {
-        return TcpMessageProtocolResolver.buildEvent(header, body);
+    private CloudEvent deserializeTcpProtocol(Header header, String bodyJson) 
throws ProtocolHandleException {
+        return TcpMessageProtocolResolver.buildEvent(header, 
JsonUtils.deserialize(bodyJson, EventMeshMessage.class));
     }
 
     private CloudEvent deserializeHttpProtocol(String requestCode,
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
index 3f69237..e0d2f4e 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
@@ -36,7 +36,7 @@ import java.util.Map;
 public class TcpMessageProtocolResolver {
 
 
-    public static CloudEvent buildEvent(Header header, Object body) throws 
ProtocolHandleException {
+    public static CloudEvent buildEvent(Header header, EventMeshMessage 
message) throws ProtocolHandleException {
 
         CloudEventBuilder cloudEventBuilder;
 
@@ -55,8 +55,6 @@ public class TcpMessageProtocolResolver {
             throw new ProtocolHandleException(String.format("Unsupported 
protocolType: %s", protocolType));
         }
 
-        EventMeshMessage message = (EventMeshMessage) body;
-
         String topic = message.getTopic();
 
         String content = message.getBody();
@@ -91,7 +89,6 @@ public class TcpMessageProtocolResolver {
     }
 
     public static Package buildEventMeshMessage(CloudEvent cloudEvent) {
-        Package pkg = new Package();
         EventMeshMessage eventMeshMessage = new EventMeshMessage();
         eventMeshMessage.setTopic(cloudEvent.getSubject());
         eventMeshMessage.setBody(new String(cloudEvent.getData().toBytes(), 
StandardCharsets.UTF_8));
@@ -102,6 +99,7 @@ public class TcpMessageProtocolResolver {
         }
         eventMeshMessage.setProperties(prop);
 
+        Package pkg = new Package();
         pkg.setBody(eventMeshMessage);
 
         return pkg;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index ca47338..b6c5459 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -24,6 +24,7 @@ import 
org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
 import org.apache.eventmesh.runtime.connector.ConnectorResource;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.registry.Registry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,7 @@ public class EventMeshServer {
     }
 
     public void init() throws Exception {
-        if(eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+        if (eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
             acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
         }
 
@@ -74,7 +75,8 @@ public class EventMeshServer {
             eventMeshTCPServer.init();
         }
 
-        String eventStore = 
System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, 
System.getenv(EventMeshConstants.EVENT_STORE_ENV));
+        String eventStore = System
+            .getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, 
System.getenv(EventMeshConstants.EVENT_STORE_ENV));
         logger.info("eventStore : {}", eventStore);
 
         serviceState = ServiceState.INITED;
@@ -82,11 +84,12 @@ public class EventMeshServer {
     }
 
     public void start() throws Exception {
-        if(eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+        if (eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
             acl.start();
         }
 
-        if (eventMeshTCPConfiguration != null && 
eventMeshTCPConfiguration.eventMeshTcpServerEnabled && 
eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
+        if (eventMeshTCPConfiguration != null && 
eventMeshTCPConfiguration.eventMeshTcpServerEnabled &&
+            eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
             registry.start();
         }
 
@@ -104,14 +107,14 @@ public class EventMeshServer {
         eventMeshHTTPServer.shutdown();
         if (eventMeshTCPConfiguration != null && 
eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
             eventMeshTCPServer.shutdown();
-            if(eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
+            if (eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
                 registry.shutdown();
             }
         }
 
         connectorResource.release();
 
-        if(eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+        if (eventMeshHttpConfiguration != null && 
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
             acl.shutdown();
         }
         serviceState = ServiceState.STOPED;
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 071c104..547d9d5 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -50,6 +50,8 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 public class ClientSessionGroupMapping {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -490,7 +492,7 @@ public class ClientSessionGroupMapping {
         if (!clientGroupMap.isEmpty()) {
             result = new HashMap<>();
             for (Map.Entry<String, ClientGroupWrapper> entry : 
clientGroupMap.entrySet()) {
-                Map<String, Integer> map = new HashMap();
+                Map<String, Integer> map = new HashMap<>();
                 map.put(EventMeshConstants.PURPOSE_SUB, 
entry.getValue().getGroupConsumerSessions().size());
                 map.put(EventMeshConstants.PURPOSE_PUB, 
entry.getValue().getGroupProducerSessions().size());
                 result.put(entry.getKey(), map);
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
index bec64ca..f378469 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
@@ -42,8 +42,6 @@ public interface EventMeshTCPClient<ProtocolMessage> extends 
AutoCloseable {
 
     void broadcast(ProtocolMessage msg, long timeout) throws 
EventMeshException;
 
-    void heartbeat() throws EventMeshException;
-
     void listen() throws EventMeshException;
 
     void subscribe(String topic, SubscriptionMode subscriptionMode, 
SubscriptionType subscriptionType)
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
index bbe9a83..652c99a 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
@@ -34,8 +34,6 @@ public interface EventMeshTCPPubClient<ProtocolMessage> 
extends AutoCloseable {
 
     void init() throws EventMeshException;
 
-    void heartbeat() throws EventMeshException;
-
     void reconnect() throws EventMeshException;
 
     // todo: Hide package method, use ProtocolMessage
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
index 2118648..97cedd8 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
@@ -34,8 +34,6 @@ public interface EventMeshTCPSubClient<ProtocolMessage> 
extends AutoCloseable {
 
     void init() throws EventMeshException;
 
-    void heartbeat() throws EventMeshException;
-
     void reconnect() throws EventMeshException;
 
     void subscribe(String topic, SubscriptionMode subscriptionMode, 
SubscriptionType subscriptionType)
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
index eb29f17..3110dcc 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
@@ -17,8 +17,6 @@
 
 package org.apache.eventmesh.client.tcp.common;
 
-import org.apache.eventmesh.common.protocol.tcp.Package;
-
 import io.netty.channel.ChannelHandlerContext;
 
 /**
@@ -26,19 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
  *
  * @param <ProtocolMessage> receive message type.
  */
+@FunctionalInterface
 public interface ReceiveMsgHook<ProtocolMessage> {
-    void handle(Package msg, ChannelHandlerContext ctx);
+    void handle(ProtocolMessage msg, ChannelHandlerContext ctx);
 
-    /**
-     * Convert tcp package to protocolMessage.
-     * <ul>
-     *     <li>{@link org.apache.eventmesh.common.EventMeshMessage} </li>
-     *     <li>{@link io.openmessaging.api.Message}</li>
-     *     <li>{@link io.cloudevents.CloudEvent}</li>
-     * </ul>
-     *
-     * @param pkg
-     * @return
-     */
-    ProtocolMessage convertToProtocolMessage(Package pkg);
 }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
index 61098ad..c3c7e65 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
@@ -18,6 +18,7 @@
 package org.apache.eventmesh.client.tcp.common;
 
 import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
 
@@ -26,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -66,6 +68,8 @@ public abstract class TcpClient implements Closeable {
 
     private Channel channel;
 
+    private ScheduledFuture<?> heartTask;
+
     protected static final ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(
         Runtime.getRuntime().availableProcessors(),
         new 
ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build());
@@ -107,12 +111,33 @@ public abstract class TcpClient implements Closeable {
         try {
             channel.disconnect().sync();
             workers.shutdownGracefully();
+            if (heartTask != null) {
+                heartTask.cancel(false);
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.warn("close tcp client failed.|remote address={}", 
channel.remoteAddress(), e);
         }
     }
 
+    protected void heartbeat() {
+        if (heartTask != null) {
+            synchronized (TcpClient.class) {
+                heartTask = scheduler.scheduleAtFixedRate(() -> {
+                    try {
+                        if (!isActive()) {
+                            reconnect();
+                        }
+                        Package msg = MessageUtils.heartBeat();
+                        io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+                    } catch (Exception ignore) {
+                        // ignore
+                    }
+                }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, 
TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
     protected synchronized void reconnect() throws Exception {
         ChannelFuture f = bootstrap.connect(host, port).sync();
         channel = f.channel();
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
index 39faa72..abe0298 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
@@ -68,12 +68,6 @@ public class CloudEventTCPClient implements 
EventMeshTCPClient<CloudEvent> {
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        cloudEventTCPPubClient.heartbeat();
-        cloudEventTCPSubClient.heartbeat();
-    }
-
-    @Override
     public void listen() throws EventMeshException {
         cloudEventTCPSubClient.listen();
     }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
index ca713ad..56dcfa5 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
@@ -28,8 +28,11 @@ import 
org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
@@ -64,31 +67,13 @@ class CloudEventTCPPubClient extends TcpClient implements 
EventMeshTCPPubClient<
         try {
             open(new Handler());
             hello();
+            heartbeat();
         } catch (Exception ex) {
             throw new EventMeshException("Initialize 
EventMeshMessageTCPPubClient error", ex);
         }
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        if (task != null) {
-            synchronized (CloudEventTCPPubClient.class) {
-                task = scheduler.scheduleAtFixedRate(() -> {
-                    try {
-                        if (!isActive()) {
-                            reconnect();
-                        }
-                        Package msg = MessageUtils.heartBeat();
-                        io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
-                    } catch (Exception ignore) {
-                        // ignore
-                    }
-                }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, 
TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
         try {
             super.reconnect();
@@ -154,9 +139,6 @@ class CloudEventTCPPubClient extends TcpClient implements 
EventMeshTCPPubClient<
     @Override
     public void close() {
         try {
-            if (task != null) {
-                task.cancel(false);
-            }
             goodbye();
             super.close();
         } catch (Exception ex) {
@@ -173,10 +155,10 @@ class CloudEventTCPPubClient extends TcpClient implements 
EventMeshTCPPubClient<
 
             Command cmd = msg.getHeader().getCommand();
             if (cmd == Command.RESPONSE_TO_CLIENT) {
+                Package pkg = responseToClientAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((CloudEvent) pkg.getBody(), ctx);
                 }
-                Package pkg = MessageUtils.responseToClientAck(msg);
                 send(pkg);
             } else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
                 //TODO
@@ -201,4 +183,11 @@ class CloudEventTCPPubClient extends TcpClient implements 
EventMeshTCPPubClient<
         Package msg = MessageUtils.goodbye();
         this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
     }
+
+    private Package responseToClientAck(Package in) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.RESPONSE_TO_CLIENT_ACK, 0, null, 
in.getHeader().getSeq()));
+        msg.setBody(JsonUtils.deserialize(in.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
 }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
index d19f663..9ea509b 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
@@ -29,18 +29,20 @@ import 
org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import lombok.extern.slf4j.Slf4j;
@@ -54,7 +56,6 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
     private final UserAgent                  userAgent;
     private final List<SubscriptionItem>     subscriptionItems = 
Collections.synchronizedList(new ArrayList<>());
     private       ReceiveMsgHook<CloudEvent> callback;
-    private       ScheduledFuture<?>         task;
 
     public CloudEventTCPSubClient(EventMeshTCPClientConfig 
eventMeshTcpClientConfig) {
         super(eventMeshTcpClientConfig);
@@ -66,6 +67,7 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
         try {
             open(new Handler());
             hello();
+            heartbeat();
             log.info("SimpleSubClientImpl|{}|started!", clientNo);
         } catch (Exception ex) {
             throw new EventMeshException("Initialize 
EventMeshMessageTcpSubClient error", ex);
@@ -73,28 +75,6 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        if (task == null) {
-            synchronized (CloudEventTCPSubClient.class) {
-                task = scheduler.scheduleAtFixedRate(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            if (!isActive()) {
-                                reconnect();
-                            }
-                            Package msg = MessageUtils.heartBeat();
-                            io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
-                        } catch (Exception ignore) {
-                            //
-                        }
-                    }
-                }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, 
TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
         try {
             super.reconnect();
@@ -161,7 +141,6 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
     @Override
     public void close() {
         try {
-            task.cancel(false);
             goodbye();
             super.close();
         } catch (Exception ex) {
@@ -173,24 +152,24 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
         @SuppressWarnings("Duplicates")
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, Package msg) 
throws Exception {
-            Command cmd = msg.getHeader().getCommand();
+            Command cmd = msg.getHeader().getCmd();
             log.info("|receive|type={}|msg={}", cmd, msg);
             if (cmd == Command.REQUEST_TO_CLIENT) {
+                Package pkg = requestToClientAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((CloudEvent) pkg.getBody(), ctx);
                 }
-                Package pkg = MessageUtils.requestToClientAck(msg);
                 send(pkg);
             } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
-                Package pkg = MessageUtils.asyncMessageAck(msg);
+                Package pkg = asyncMessageAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((CloudEvent) msg, ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
-                Package pkg = MessageUtils.broadcastMessageAck(msg);
+                Package pkg = broadcastMessageAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((CloudEvent) msg, ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -208,4 +187,28 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
         }
     }
 
+    private Package requestToClientAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
+        // todo: Transform json to CloudEvents
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
+    private Package asyncMessageAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
+        // todo: Transform to CloudEvents
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
+    private Package broadcastMessageAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, 
null, tcpPackage.getHeader().getSeq()));
+        // todo: Transform to CloudEvents
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
 }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
index 0011435..6faf050 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
@@ -41,8 +41,8 @@ public class EventMeshMessageTCPClient implements 
EventMeshTCPClient<EventMeshMe
 
     @Override
     public void init() throws EventMeshException {
-        eventMeshMessageTCPSubClient.init();
         eventMeshMessageTCPPubClient.init();
+        eventMeshMessageTCPSubClient.init();
     }
 
     @Override
@@ -67,12 +67,6 @@ public class EventMeshMessageTCPClient implements 
EventMeshTCPClient<EventMeshMe
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        eventMeshMessageTCPPubClient.heartbeat();
-        eventMeshMessageTCPSubClient.heartbeat();
-    }
-
-    @Override
     public void listen() throws EventMeshException {
         eventMeshMessageTCPSubClient.listen();
     }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
index 990322c..fb7cd55 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
@@ -34,7 +34,6 @@ import org.apache.eventmesh.common.protocol.tcp.UserAgent;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -64,6 +63,7 @@ class EventMeshMessageTCPPubClient extends TcpClient 
implements EventMeshTCPPubC
         try {
             open(new Handler());
             hello();
+            heartbeat();
         } catch (Exception ex) {
             throw new EventMeshException("Initialize 
EventMeshMessageTCPPubClient error", ex);
         }
@@ -71,22 +71,6 @@ class EventMeshMessageTCPPubClient extends TcpClient 
implements EventMeshTCPPubC
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        task = scheduler.scheduleAtFixedRate(() -> {
-            try {
-                if (!isActive()) {
-                    reconnect();
-                }
-                Package msg = MessageUtils.heartBeat();
-                io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
-                log.debug("heartbeat to server from pub client|package {}", 
msg);
-            } catch (Exception ignore) {
-                // ignore
-            }
-        }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, 
TimeUnit.MILLISECONDS);
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
         try {
             super.reconnect();
@@ -124,7 +108,6 @@ class EventMeshMessageTCPPubClient extends TcpClient 
implements EventMeshTCPPubC
     @Override
     public Package publish(EventMeshMessage eventMeshMessage, long timeout) 
throws EventMeshException {
         try {
-            // todo: transform EventMeshMessage to Package
             Package msg = MessageUtils.buildPackage(eventMeshMessage, 
Command.ASYNC_MESSAGE_TO_SERVER);
             log.info("SimplePubClientImpl em 
message|{}|publish|send|type={}|protocol={}|msg={}",
                 clientNo, msg.getHeader().getCommand(),
@@ -175,10 +158,11 @@ class EventMeshMessageTCPPubClient extends TcpClient 
implements EventMeshTCPPubC
 
             Command cmd = msg.getHeader().getCommand();
             if (cmd == Command.RESPONSE_TO_CLIENT) {
+                // todo: Transform to CloudEvents
+                Package pkg = MessageUtils.responseToClientAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((EventMeshMessage) pkg.getBody(), ctx);
                 }
-                Package pkg = MessageUtils.responseToClientAck(msg);
                 send(pkg);
             } else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
                 //TODO
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
index c1887b6..01b9b0d 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
@@ -30,8 +30,10 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode;
 import org.apache.eventmesh.common.protocol.SubscriptionType;
 import org.apache.eventmesh.common.protocol.tcp.Command;
 import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -39,7 +41,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -63,6 +64,7 @@ class EventMeshMessageTCPSubClient extends TcpClient 
implements EventMeshTCPSubC
         try {
             open(new Handler());
             hello();
+            heartbeat();
             log.info("SimpleSubClientImpl|{}|started!", clientNo);
         } catch (Exception ex) {
             throw new EventMeshException("Initialize 
EventMeshMessageTcpSubClient error", ex);
@@ -70,25 +72,6 @@ class EventMeshMessageTCPSubClient extends TcpClient 
implements EventMeshTCPSubC
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        task = scheduler.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (!isActive()) {
-                        reconnect();
-                    }
-                    Package msg = MessageUtils.heartBeat();
-                    io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
-                    log.debug("heartbeat to server from sub client|package 
{}", msg);
-                } catch (Exception ignore) {
-                    //
-                }
-            }
-        }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, 
TimeUnit.MILLISECONDS);
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
         try {
             super.reconnect();
@@ -170,21 +153,21 @@ class EventMeshMessageTCPSubClient extends TcpClient 
implements EventMeshTCPSubC
             Command cmd = msg.getHeader().getCommand();
             log.info("|receive|type={}|msg={}", cmd, msg);
             if (cmd == Command.REQUEST_TO_CLIENT) {
+                Package pkg = requestToClientAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((EventMeshMessage) pkg.getBody(), ctx);
                 }
-                Package pkg = MessageUtils.requestToClientAck(msg);
                 send(pkg);
             } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
-                Package pkg = MessageUtils.asyncMessageAck(msg);
+                Package pkg = asyncMessageAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((EventMeshMessage) pkg.getBody(), ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
-                Package pkg = MessageUtils.broadcastMessageAck(msg);
+                Package pkg = broadcastMessageAck(msg);
                 if (callback != null) {
-                    callback.handle(msg, ctx);
+                    callback.handle((EventMeshMessage) pkg.getBody(), ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -202,4 +185,25 @@ class EventMeshMessageTCPSubClient extends TcpClient 
implements EventMeshTCPSubC
         }
     }
 
+    private Package requestToClientAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
+    private Package asyncMessageAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
+    private Package broadcastMessageAck(Package tcpPackage) {
+        Package msg = new Package();
+        msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, 
null, tcpPackage.getHeader().getSeq()));
+        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        return msg;
+    }
+
 }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
index 6381b6f..25a929e 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
@@ -69,12 +69,6 @@ public class OpenMessageTCPClient implements 
EventMeshTCPClient<Message> {
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-        eventMeshTCPPubClient.heartbeat();
-        eventMeshTCPSubClient.heartbeat();
-    }
-
-    @Override
     public void listen() throws EventMeshException {
         eventMeshTCPSubClient.listen();
     }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
index 7e3d40d..3eebdd2 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
@@ -42,11 +42,6 @@ class OpenMessageTCPPubClient implements 
EventMeshTCPPubClient<Message> {
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
 
     }
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
index c0de8b1..34c3ad0 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
@@ -42,11 +42,6 @@ class OpenMessageTCPSubClient implements 
EventMeshTCPSubClient<Message> {
     }
 
     @Override
-    public void heartbeat() throws EventMeshException {
-
-    }
-
-    @Override
     public void reconnect() throws EventMeshException {
 
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to