This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch cloudevents
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/cloudevents by this push:
new 0036d0a Change TCP Decoder and Encoder (#621)
0036d0a is described below
commit 0036d0a04063ce04a90b7b921061e89b2e7805ec
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 1 10:13:47 2021 +0800
Change TCP Decoder and Encoder (#621)
---
.../common/protocol/tcp/EventMeshMessage.java | 55 +-----
.../eventmesh/common/protocol/tcp/Header.java | 25 +--
.../eventmesh/common/protocol/tcp/Package.java | 11 +-
.../eventmesh/common/protocol/tcp/codec/Codec.java | 210 +++++++++++++--------
.../eventmesh/tcp/common/EventMeshTestUtils.java | 5 +-
.../tcp/demo/pub/cloudevents/AsyncPublish.java | 1 -
.../demo/pub/eventmeshmessage/AsyncPublish.java | 1 -
.../eventmeshmessage/AsyncPublishBroadcast.java | 1 -
.../tcp/demo/pub/eventmeshmessage/SyncRequest.java | 1 -
.../tcp/demo/sub/cloudevents/AsyncSubscribe.java | 13 +-
.../demo/sub/eventmeshmessage/AsyncSubscribe.java | 21 +--
.../eventmeshmessage/AsyncSubscribeBroadcast.java | 10 +-
.../demo/sub/eventmeshmessage/SyncResponse.java | 7 +-
.../cloudevents/CloudEventsProtocolAdaptor.java | 11 +-
.../resolver/tcp/TcpMessageProtocolResolver.java | 8 +-
.../meshmessage/MeshMessageProtocolAdaptor.java | 13 +-
.../resolver/tcp/TcpMessageProtocolResolver.java | 6 +-
.../eventmesh/runtime/boot/EventMeshServer.java | 15 +-
.../client/group/ClientSessionGroupMapping.java | 4 +-
.../eventmesh/client/tcp/EventMeshTCPClient.java | 2 -
.../client/tcp/EventMeshTCPPubClient.java | 2 -
.../client/tcp/EventMeshTCPSubClient.java | 2 -
.../client/tcp/common/ReceiveMsgHook.java | 17 +-
.../eventmesh/client/tcp/common/TcpClient.java | 25 +++
.../tcp/impl/cloudevent/CloudEventTCPClient.java | 6 -
.../impl/cloudevent/CloudEventTCPPubClient.java | 37 ++--
.../impl/cloudevent/CloudEventTCPSubClient.java | 69 +++----
.../EventMeshMessageTCPClient.java | 8 +-
.../EventMeshMessageTCPPubClient.java | 24 +--
.../EventMeshMessageTCPSubClient.java | 56 +++---
.../tcp/impl/openmessage/OpenMessageTCPClient.java | 6 -
.../impl/openmessage/OpenMessageTCPPubClient.java | 5 -
.../impl/openmessage/OpenMessageTCPSubClient.java | 5 -
33 files changed, 315 insertions(+), 367 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
index 01d744e..a51f9e0 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/EventMeshMessage.java
@@ -20,51 +20,16 @@ package org.apache.eventmesh.common.protocol.tcp;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class EventMeshMessage {
-
- private String topic;
- Map<String, String> properties = new ConcurrentHashMap<>();
- private String body;
-
- public EventMeshMessage() {
- }
-
- public EventMeshMessage(String topic, Map<String, String> properties,
String body) {
- this.topic = topic;
- this.properties = properties;
- this.body = body;
- }
-
- public String getTopic() {
- return topic;
- }
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public Map<String, String> getProperties() {
- return properties;
- }
-
- public void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
-
- public String getBody() {
- return body;
- }
-
- public void setBody(String body) {
- this.body = body;
- }
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class EventMeshMessage {
- @Override
- public String toString() {
- return "EventMeshMessage{" +
- "topic='" + topic + '\'' +
- ", properties=" + properties +
- ", body='" + body + '\'' +
- '}';
- }
+ private String topic;
+ private Map<String, String> properties = new ConcurrentHashMap<>();
+ private String body;
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
index d3d5cc6..da30aea 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java
@@ -20,13 +20,16 @@ package org.apache.eventmesh.common.protocol.tcp;
import java.util.HashMap;
import java.util.Map;
+import lombok.Data;
+
+@Data
public class Header {
- private Command cmd;
- private int code;
- private String desc;
- private String seq;
- private Map<String, Object> properties;
+ private Command cmd;
+ private int code;
+ private String desc;
+ private String seq;
+ private Map<String, Object> properties = new HashMap<>();
public Header() {
}
@@ -45,6 +48,7 @@ public class Header {
this.properties = properties;
}
+
public Command getCommand() {
return cmd;
}
@@ -97,18 +101,7 @@ public class Header {
if (null == this.properties) {
this.properties = new HashMap<>();
}
-
return this.properties.get(name);
}
- @Override
- public String toString() {
- return "Header{" +
- "cmd=" + cmd +
- ", code=" + code +
- ", desc='" + desc + '\'' +
- ", seq='" + seq + '\'' +
- ", properties=" + properties +
- '}';
- }
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
index f277d06..1a2a223 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Package.java
@@ -19,23 +19,20 @@ package org.apache.eventmesh.common.protocol.tcp;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
+@NoArgsConstructor
+@AllArgsConstructor
public class Package implements ProtocolTransportObject {
private Header header;
private Object body;
- public Package() {
- }
-
public Package(Header header) {
this.header = header;
}
- public Package(Header header, Object body) {
- this.header = header;
- this.body = body;
- }
}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index d520e04..191a099 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -17,12 +17,23 @@
package org.apache.eventmesh.common.protocol.tcp.codec;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
+import org.apache.eventmesh.common.protocol.tcp.Subscription;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+
+import org.apache.commons.lang3.ArrayUtils;
+
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -31,53 +42,48 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ReplayingDecoder;
+import lombok.extern.slf4j.Slf4j;
-import org.apache.eventmesh.common.protocol.tcp.Command;
-import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
-import org.apache.eventmesh.common.protocol.tcp.Header;
-import org.apache.eventmesh.common.protocol.tcp.Package;
-import org.apache.eventmesh.common.protocol.tcp.RedirectInfo;
-import org.apache.eventmesh.common.protocol.tcp.Subscription;
-import org.apache.eventmesh.common.protocol.tcp.UserAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+@Slf4j
public class Codec {
- private final static Logger logger = LoggerFactory.getLogger(Codec.class);
- private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;
- private static Charset UTF8 = Charset.forName("UTF-8");
+ private static final int FRAME_MAX_LENGTH = 1024 * 1024 * 4;
+ private static final Charset DEFAULT_CHARSET =
Charset.forName(Constants.DEFAULT_CHARSET);
- private static final byte[] CONSTANT_MAGIC_FLAG =
"EventMesh".getBytes(UTF8);
+ private static final byte[] CONSTANT_MAGIC_FLAG =
serializeBytes("EventMesh");
+ private static final byte[] VERSION = serializeBytes("0000");
- private static final byte[] VERSION = "0000".getBytes(UTF8);
+ // todo: move to constants
+ public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";
+ public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage";
+ public static String OPEN_MESSAGE_PROTOCOL_NAME = "openmessage";
- private static ObjectMapper jsonMapper;
+ // todo: use json util
+ private static ObjectMapper OBJECT_MAPPER;
static {
- jsonMapper = new ObjectMapper();
- jsonMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- jsonMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
- jsonMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
- jsonMapper.setTimeZone(TimeZone.getDefault());
+ OBJECT_MAPPER = new ObjectMapper();
+ OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ OBJECT_MAPPER.setTimeZone(TimeZone.getDefault());
}
public static class Encoder extends MessageToByteEncoder<Package> {
@Override
public void encode(ChannelHandlerContext ctx, Package pkg, ByteBuf
out) throws Exception {
- byte[] headerData;
- byte[] bodyData;
+ final String headerJson = pkg != null ?
OBJECT_MAPPER.writeValueAsString(pkg.getHeader()) : null;
+ final String bodyJson = pkg != null ?
OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;
- final String headerJson = pkg != null ?
jsonMapper.writeValueAsString(pkg.getHeader()) : null;
- final String bodyJson = pkg != null ?
jsonMapper.writeValueAsString(pkg.getBody()) : null;
+ final byte[] headerData = serializeBytes(headerJson);
+ final byte[] bodyData = serializeBytes(bodyJson);
- headerData = headerJson == null ? null : headerJson.getBytes(UTF8);
- bodyData = bodyJson == null ? null : bodyJson.getBytes(UTF8);
-
- logger.debug("headerJson={}|bodyJson={}", headerJson, bodyJson);
+ if (log.isDebugEnabled()) {
+ log.debug("Encoder headerJson={}|bodyJson={}", headerJson,
bodyJson);
+ }
- int headerLength = headerData == null ? 0 : headerData.length;
- int bodyLength = bodyData == null ? 0 : bodyData.length;
+ int headerLength = ArrayUtils.getLength(headerData);
+ int bodyLength = ArrayUtils.getLength(bodyData);
int length = 4 + 4 + headerLength + bodyLength;
@@ -89,94 +95,140 @@ public class Codec {
out.writeBytes(VERSION);
out.writeInt(length);
out.writeInt(headerLength);
- if (headerData != null)
+ if (headerData != null) {
out.writeBytes(headerData);
- if (bodyData != null)
+ }
+ if (bodyData != null) {
out.writeBytes(bodyData);
+ }
}
}
- public static class Decoder extends ReplayingDecoder {
+ public static class Decoder extends ReplayingDecoder<Package> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception {
- Header header = null;
- Object body = null;
-
- int length = 0;
- int headerLength = 0;
- int bodyLength = 0;
-
try {
- if (null == in)
+ if (null == in) {
return;
-
- byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length];
- byte[] versionBytes = new byte[VERSION.length];
-
- in.readBytes(flagBytes);
- in.readBytes(versionBytes);
- if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) ||
!Arrays.equals(versionBytes, VERSION)) {
- String errorMsg = String.format("invalid magic flag or " +
- "version|flag=%s|version=%s|remoteAddress=%s", new
String(flagBytes, UTF8), new String
- (versionBytes, UTF8),
ctx.channel().remoteAddress());
- throw new Exception(errorMsg);
}
- length = in.readInt();
- headerLength = in.readInt();
- bodyLength = length - 8 - headerLength;
- byte[] headerData = new byte[headerLength];
- byte[] bodyData = new byte[bodyLength];
+ byte[] flagBytes = parseFlag(in);
+ byte[] versionBytes = parseVersion(in);
+ validateFlag(flagBytes, versionBytes, ctx);
- if (headerLength > 0) {
- in.readBytes(headerData);
- header = jsonMapper.readValue(new String(headerData,
UTF8), Header.class);
- }
-
- if (bodyLength > 0 && header != null) {
- in.readBytes(bodyData);
- body = parseFromJson(header.getCommand(), new
String(bodyData, UTF8));
- }
-
- logger.debug("headerJson={}|bodyJson={}", new
String(headerData, UTF8), new String(bodyData, UTF8));
+ final int length = in.readInt();
+ final int headerLength = in.readInt();
+ final int bodyLength = length - 8 - headerLength;
+ Header header = parseHeader(in, headerLength);
+ Object body = parseBody(in, header, bodyLength);
Package pkg = new Package(header, body);
out.add(pkg);
} catch (Exception e) {
-
logger.error("decode|length={}|headerLength={}|bodyLength={}|header={}|body={}.",
length,
- headerLength, bodyLength, header, body);
+ log.error("decode error| receive: {}.",
deserializeBytes(in.array()));
throw e;
}
}
+
+ private byte[] parseFlag(ByteBuf in) {
+ final byte[] flagBytes = new byte[CONSTANT_MAGIC_FLAG.length];
+ in.readBytes(flagBytes);
+ return flagBytes;
+ }
+
+ private byte[] parseVersion(ByteBuf in) {
+ final byte[] versionBytes = new byte[VERSION.length];
+ in.readBytes(versionBytes);
+ return versionBytes;
+ }
+
+ private Header parseHeader(ByteBuf in, int headerLength) throws
JsonProcessingException {
+ if (headerLength <= 0) {
+ return null;
+ }
+ final byte[] headerData = new byte[headerLength];
+ in.readBytes(headerData);
+ if (log.isDebugEnabled()) {
+ log.debug("Decode headerJson={}",
deserializeBytes(headerData));
+ }
+ return OBJECT_MAPPER.readValue(deserializeBytes(headerData),
Header.class);
+ }
+
+ private Object parseBody(ByteBuf in, Header header, int bodyLength)
throws JsonProcessingException {
+ if (bodyLength <= 0 || header == null) {
+ return null;
+ }
+ final byte[] bodyData = new byte[bodyLength];
+ in.readBytes(bodyData);
+ if (log.isDebugEnabled()) {
+ log.debug("Decode bodyJson={}", deserializeBytes(bodyData));
+ }
+ return deserializeBody(deserializeBytes(bodyData), header);
+ }
+
+ private void validateFlag(byte[] flagBytes, byte[] versionBytes,
ChannelHandlerContext ctx) {
+ if (!Arrays.equals(flagBytes, CONSTANT_MAGIC_FLAG) ||
!Arrays.equals(versionBytes, VERSION)) {
+ String errorMsg = String.format(
+ "invalid magic flag or
version|flag=%s|version=%s|remoteAddress=%s",
+ deserializeBytes(flagBytes),
deserializeBytes(versionBytes), ctx.channel().remoteAddress());
+ throw new IllegalArgumentException(errorMsg);
+ }
+ }
}
- private static Object parseFromJson(Command cmd, String data) throws
Exception {
- switch (cmd) {
+ private static Object deserializeBody(String bodyJsonString, Header
header) throws JsonProcessingException {
+ Command command = header.getCommand();
+ switch (command) {
case HELLO_REQUEST:
case RECOMMEND_REQUEST:
- return jsonMapper.readValue(data, UserAgent.class);
+ return OBJECT_MAPPER.readValue(bodyJsonString,
UserAgent.class);
case SUBSCRIBE_REQUEST:
case UNSUBSCRIBE_REQUEST:
- return jsonMapper.readValue(data, Subscription.class);
+ return OBJECT_MAPPER.readValue(bodyJsonString,
Subscription.class);
case REQUEST_TO_SERVER:
case RESPONSE_TO_SERVER:
case ASYNC_MESSAGE_TO_SERVER:
case BROADCAST_MESSAGE_TO_SERVER:
- return jsonMapper.readValue(data, EventMeshMessage.class);
case REQUEST_TO_CLIENT:
case RESPONSE_TO_CLIENT:
case ASYNC_MESSAGE_TO_CLIENT:
case BROADCAST_MESSAGE_TO_CLIENT:
- return jsonMapper.readValue(data, EventMeshMessage.class);
case REQUEST_TO_CLIENT_ACK:
case RESPONSE_TO_CLIENT_ACK:
case ASYNC_MESSAGE_TO_CLIENT_ACK:
case BROADCAST_MESSAGE_TO_CLIENT_ACK:
- return jsonMapper.readValue(data, EventMeshMessage.class);
+ // The message json will be deserialized by protocol plugin
+ return bodyJsonString;
case REDIRECT_TO_CLIENT:
- return jsonMapper.readValue(data, RedirectInfo.class);
+ return OBJECT_MAPPER.readValue(bodyJsonString,
RedirectInfo.class);
default:
+ log.error("Invalidate TCP command: {}", command);
return null;
}
}
+
+ /**
+ * Deserialize bytes to String.
+ *
+ * @param bytes
+ * @return
+ */
+ private static String deserializeBytes(byte[] bytes) {
+ return new String(bytes, DEFAULT_CHARSET);
+ }
+
+ /**
+ * Serialize String to bytes.
+ *
+ * @param str
+ * @return
+ */
+ private static byte[] serializeBytes(String str) {
+ if (str == null) {
+ return null;
+ }
+ return str.getBytes(DEFAULT_CHARSET);
+ }
+
+
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 8c3a06f..4359832 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -32,6 +32,7 @@ import
org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -107,10 +108,10 @@ public class EventMeshTestUtils {
return msg;
}
- public static Package rrResponse(Package request) {
+ public static Package rrResponse(EventMeshMessage request) {
Package msg = new Package();
msg.setHeader(new Header(RESPONSE_TO_SERVER, 0, null,
generateRandomString(seqLength)));
- msg.setBody(request.getBody());
+ msg.setBody(request);
return msg;
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
index f279308..6864bf1 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java
@@ -55,7 +55,6 @@ public class AsyncPublish {
client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
CloudEvent.class);
client.init();
- client.heartbeat();
for (int i = 0; i < 5; i++) {
CloudEvent event = EventMeshTestUtils.generateCloudEventV1();
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
index bc39352..7cf1a64 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublish.java
@@ -53,7 +53,6 @@ public class AsyncPublish {
client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
EventMeshMessage.class);
client.init();
- client.heartbeat();
for (int i = 0; i < 5; i++) {
EventMeshMessage eventMeshMessage =
EventMeshTestUtils.generateAsyncEventMqMsg();
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
index 07c4de5..642a2c8 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/AsyncPublishBroadcast.java
@@ -48,7 +48,6 @@ public class AsyncPublishBroadcast {
try (final EventMeshTCPClient<EventMeshMessage> client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
EventMeshMessage.class)) {
client.init();
- client.heartbeat();
EventMeshMessage eventMeshMessage =
EventMeshTestUtils.generateBroadcastMqMsg();
logger.info("begin send broadcast msg============={}",
eventMeshMessage);
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
index f13ee46..30f2ac1 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/eventmeshmessage/SyncRequest.java
@@ -41,7 +41,6 @@ public class SyncRequest {
try (EventMeshTCPClient<EventMeshMessage> client =
EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
- client.heartbeat();
EventMeshMessage eventMeshMessage =
EventMeshTestUtils.generateSyncRRMqMsg();
log.info("begin send rr msg=================={}",
eventMeshMessage);
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
index b94d12f..b41a65c 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/cloudevents/AsyncSubscribe.java
@@ -42,7 +42,7 @@ public class AsyncSubscribe implements
ReceiveMsgHook<CloudEvent> {
public static AsyncSubscribe handler = new AsyncSubscribe();
- private static EventMeshTCPClient client;
+ private static EventMeshTCPClient<CloudEvent> client;
public static void main(String[] agrs) throws Exception {
Properties properties =
Utils.readPropertiesFile("application.properties");
@@ -57,7 +57,6 @@ public class AsyncSubscribe implements
ReceiveMsgHook<CloudEvent> {
try {
client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
CloudEvent.class);
client.init();
- client.heartbeat();
client.subscribe("TEST-TOPIC-TCP-ASYNC",
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
@@ -75,13 +74,7 @@ public class AsyncSubscribe implements
ReceiveMsgHook<CloudEvent> {
}
@Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- CloudEvent event = convertToProtocolMessage(msg);
- log.info("receive async msg====================={}", event);
- }
-
- @Override
- public CloudEvent convertToProtocolMessage(Package pkg) {
- return CloudEventBuilder.from((CloudEvent) pkg.getBody()).build();
+ public void handle(CloudEvent msg, ChannelHandlerContext ctx) {
+ log.info("receive async msg====================={}", msg);
}
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
index 9df4548..dcc9082 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribe.java
@@ -21,12 +21,12 @@ import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.EventMeshTCPClientFactory;
-import
org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPClient;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.tcp.common.EventMeshTestCaseTopicSet;
import org.apache.eventmesh.tcp.common.EventMeshTestUtils;
import org.apache.eventmesh.util.Utils;
@@ -40,7 +40,7 @@ public class AsyncSubscribe implements
ReceiveMsgHook<EventMeshMessage> {
public static AsyncSubscribe handler = new AsyncSubscribe();
- private static EventMeshTCPClient client;
+ private static EventMeshTCPClient<EventMeshMessage> client;
public static void main(String[] agrs) throws Exception {
Properties properties =
Utils.readPropertiesFile("application.properties");
@@ -53,11 +53,12 @@ public class AsyncSubscribe implements
ReceiveMsgHook<EventMeshMessage> {
.userAgent(userAgent)
.build();
try {
- client =
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
EventMeshMessage.class);
+ client =
+
EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig,
EventMeshMessage.class);
client.init();
- client.heartbeat();
- client.subscribe("TEST-TOPIC-TCP-ASYNC",
SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
+
client.subscribe(EventMeshTestCaseTopicSet.TOPIC_PRX_WQ2ClientUniCast,
SubscriptionMode.CLUSTERING,
+ SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
client.listen();
@@ -73,13 +74,7 @@ public class AsyncSubscribe implements
ReceiveMsgHook<EventMeshMessage> {
}
@Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg);
- log.info("receive async msg====================={}", eventMeshMessage);
- }
-
- @Override
- public EventMeshMessage convertToProtocolMessage(Package pkg) {
- return (EventMeshMessage) pkg.getBody();
+ public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
+ log.info("receive async msg====================={}", msg);
}
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
index 31355e0..09890a3 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/AsyncSubscribeBroadcast.java
@@ -52,7 +52,6 @@ public class AsyncSubscribeBroadcast implements
ReceiveMsgHook<EventMeshMessage>
try (EventMeshTCPClient<EventMeshMessage> client =
EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig, EventMeshMessage.class)) {
client.init();
- client.heartbeat();
client.subscribe("TEST-TOPIC-TCP-BROADCAST",
SubscriptionMode.BROADCASTING, SubscriptionType.ASYNC);
client.registerSubBusiHandler(handler);
@@ -65,13 +64,8 @@ public class AsyncSubscribeBroadcast implements
ReceiveMsgHook<EventMeshMessage>
}
@Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
- EventMeshMessage eventMeshMessage = convertToProtocolMessage(msg);
- log.info("receive broadcast msg==============={}", eventMeshMessage);
+ public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
+ log.info("receive broadcast msg==============={}", msg);
}
- @Override
- public EventMeshMessage convertToProtocolMessage(Package pkg) {
- return (EventMeshMessage) pkg.getBody();
- }
}
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
index 5ed9dae..1c5effd 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/sub/eventmeshmessage/SyncResponse.java
@@ -46,7 +46,6 @@ public class SyncResponse implements
ReceiveMsgHook<EventMeshMessage> {
try (EventMeshTCPClient<EventMeshMessage> client =
EventMeshTCPClientFactory
.createEventMeshTCPClient(eventMeshTcpClientConfig,
EventMeshMessage.class)) {
client.init();
- client.heartbeat();
client.subscribe("TEST-TOPIC-TCP-SYNC",
SubscriptionMode.CLUSTERING, SubscriptionType.SYNC);
// Synchronize RR messages
@@ -60,14 +59,10 @@ public class SyncResponse implements
ReceiveMsgHook<EventMeshMessage> {
}
@Override
- public void handle(Package msg, ChannelHandlerContext ctx) {
+ public void handle(EventMeshMessage msg, ChannelHandlerContext ctx) {
log.info("receive sync rr msg================{}", msg);
Package pkg = EventMeshTestUtils.rrResponse(msg);
ctx.writeAndFlush(pkg);
}
- @Override
- public EventMeshMessage convertToProtocolMessage(Package pkg) {
- return null;
- }
}
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
index 2ca1e08..b670ef9 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
@@ -50,10 +50,11 @@ public class CloudEventsProtocolAdaptor<T extends
ProtocolTransportObject>
public CloudEvent toCloudEvent(ProtocolTransportObject cloudEvent) throws
ProtocolHandleException {
if (cloudEvent instanceof Package) {
- Header header = ((Package) cloudEvent).getHeader();
- Object body = ((Package) cloudEvent).getBody();
+ Package tcpPackage = (Package) cloudEvent;
+ Header header = tcpPackage.getHeader();
+ String cloudEventJson = tcpPackage.getBody().toString();
- return deserializeTcpProtocol(header, body);
+ return deserializeTcpProtocol(header, cloudEventJson);
} else if (cloudEvent instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header =
((HttpCommand) cloudEvent).getHeader();
@@ -66,8 +67,8 @@ public class CloudEventsProtocolAdaptor<T extends
ProtocolTransportObject>
}
}
- private CloudEvent deserializeTcpProtocol(Header header, Object body)
throws ProtocolHandleException {
- return TcpMessageProtocolResolver.buildEvent(header, body);
+ private CloudEvent deserializeTcpProtocol(Header header, String
cloudEventJson) throws ProtocolHandleException {
+ return TcpMessageProtocolResolver.buildEvent(header, cloudEventJson);
}
private CloudEvent deserializeHttpProtocol(String requestCode,
org.apache.eventmesh.common.protocol.http.header.Header header, Body body)
throws ProtocolHandleException {
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
index e5fa7f2..0e2b303 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
@@ -11,7 +11,7 @@ import
org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;
public class TcpMessageProtocolResolver {
- public static CloudEvent buildEvent(Header header, Object body) throws
ProtocolHandleException {
+ public static CloudEvent buildEvent(Header header, String cloudEventJson)
throws ProtocolHandleException {
CloudEventBuilder cloudEventBuilder;
String protocolType =
header.getProperty(Constants.PROTOCOL_TYPE).toString();
@@ -29,7 +29,8 @@ public class TcpMessageProtocolResolver {
throw new ProtocolHandleException(String.format("Unsupported
protocolType: %s", protocolType));
}
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
- cloudEventBuilder = CloudEventBuilder.v1((CloudEvent) body);
+ // todo: transform cloudEventJson to cloudEvent
+ cloudEventBuilder = CloudEventBuilder.v1(null);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey,
header.getProperty(propKey).toString());
@@ -38,7 +39,8 @@ public class TcpMessageProtocolResolver {
return cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(),
protocolVersion)) {
- cloudEventBuilder = CloudEventBuilder.v03((CloudEvent) body);
+ // todo: transform cloudEventJson to cloudEvent
+ cloudEventBuilder = CloudEventBuilder.v03(null);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey,
header.getProperty(propKey).toString());
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
index 9dac295..75a99fd 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/MeshMessageProtocolAdaptor.java
@@ -22,8 +22,10 @@ import
org.apache.eventmesh.common.protocol.ProtocolTransportObject;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import
org.apache.eventmesh.protocol.meshmessage.resolver.http.SendMessageBatchProtocolResolver;
@@ -45,10 +47,11 @@ public class MeshMessageProtocolAdaptor implements
ProtocolAdaptor<ProtocolTrans
@Override
public CloudEvent toCloudEvent(ProtocolTransportObject protocol) throws
ProtocolHandleException {
if (protocol instanceof Package) {
- Header header = ((Package) protocol).getHeader();
- Object body = ((Package) protocol).getBody();
+ Package tcpPackage = (Package) protocol;
+ Header header = tcpPackage.getHeader();
+ String bodyJson = (String) tcpPackage.getBody();
- return deserializeTcpProtocol(header, body);
+ return deserializeTcpProtocol(header, bodyJson);
} else if (protocol instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header =
((HttpCommand) protocol).getHeader();
@@ -61,8 +64,8 @@ public class MeshMessageProtocolAdaptor implements
ProtocolAdaptor<ProtocolTrans
}
}
- private CloudEvent deserializeTcpProtocol(Header header, Object body)
throws ProtocolHandleException {
- return TcpMessageProtocolResolver.buildEvent(header, body);
+ private CloudEvent deserializeTcpProtocol(Header header, String bodyJson)
throws ProtocolHandleException {
+ return TcpMessageProtocolResolver.buildEvent(header,
JsonUtils.deserialize(bodyJson, EventMeshMessage.class));
}
private CloudEvent deserializeHttpProtocol(String requestCode,
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
index 3f69237..e0d2f4e 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-meshmessage/src/main/java/org/apache/eventmesh/protocol/meshmessage/resolver/tcp/TcpMessageProtocolResolver.java
@@ -36,7 +36,7 @@ import java.util.Map;
public class TcpMessageProtocolResolver {
- public static CloudEvent buildEvent(Header header, Object body) throws
ProtocolHandleException {
+ public static CloudEvent buildEvent(Header header, EventMeshMessage
message) throws ProtocolHandleException {
CloudEventBuilder cloudEventBuilder;
@@ -55,8 +55,6 @@ public class TcpMessageProtocolResolver {
throw new ProtocolHandleException(String.format("Unsupported
protocolType: %s", protocolType));
}
- EventMeshMessage message = (EventMeshMessage) body;
-
String topic = message.getTopic();
String content = message.getBody();
@@ -91,7 +89,6 @@ public class TcpMessageProtocolResolver {
}
public static Package buildEventMeshMessage(CloudEvent cloudEvent) {
- Package pkg = new Package();
EventMeshMessage eventMeshMessage = new EventMeshMessage();
eventMeshMessage.setTopic(cloudEvent.getSubject());
eventMeshMessage.setBody(new String(cloudEvent.getData().toBytes(),
StandardCharsets.UTF_8));
@@ -102,6 +99,7 @@ public class TcpMessageProtocolResolver {
}
eventMeshMessage.setProperties(prop);
+ Package pkg = new Package();
pkg.setBody(eventMeshMessage);
return pkg;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index ca47338..b6c5459 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -24,6 +24,7 @@ import
org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.connector.ConnectorResource;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.registry.Registry;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,7 @@ public class EventMeshServer {
}
public void init() throws Exception {
- if(eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ if (eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
}
@@ -74,7 +75,8 @@ public class EventMeshServer {
eventMeshTCPServer.init();
}
- String eventStore =
System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES,
System.getenv(EventMeshConstants.EVENT_STORE_ENV));
+ String eventStore = System
+ .getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES,
System.getenv(EventMeshConstants.EVENT_STORE_ENV));
logger.info("eventStore : {}", eventStore);
serviceState = ServiceState.INITED;
@@ -82,11 +84,12 @@ public class EventMeshServer {
}
public void start() throws Exception {
- if(eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ if (eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
acl.start();
}
- if (eventMeshTCPConfiguration != null &&
eventMeshTCPConfiguration.eventMeshTcpServerEnabled &&
eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
+ if (eventMeshTCPConfiguration != null &&
eventMeshTCPConfiguration.eventMeshTcpServerEnabled &&
+ eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
registry.start();
}
@@ -104,14 +107,14 @@ public class EventMeshServer {
eventMeshHTTPServer.shutdown();
if (eventMeshTCPConfiguration != null &&
eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
eventMeshTCPServer.shutdown();
- if(eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
+ if (eventMeshTCPConfiguration.eventMeshServerRegistryEnable) {
registry.shutdown();
}
}
connectorResource.release();
- if(eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ if (eventMeshHttpConfiguration != null &&
eventMeshHttpConfiguration.eventMeshServerSecurityEnable) {
acl.shutdown();
}
serviceState = ServiceState.STOPED;
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 071c104..547d9d5 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -50,6 +50,8 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
public class ClientSessionGroupMapping {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -490,7 +492,7 @@ public class ClientSessionGroupMapping {
if (!clientGroupMap.isEmpty()) {
result = new HashMap<>();
for (Map.Entry<String, ClientGroupWrapper> entry :
clientGroupMap.entrySet()) {
- Map<String, Integer> map = new HashMap();
+ Map<String, Integer> map = new HashMap<>();
map.put(EventMeshConstants.PURPOSE_SUB,
entry.getValue().getGroupConsumerSessions().size());
map.put(EventMeshConstants.PURPOSE_PUB,
entry.getValue().getGroupProducerSessions().size());
result.put(entry.getKey(), map);
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
index bec64ca..f378469 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPClient.java
@@ -42,8 +42,6 @@ public interface EventMeshTCPClient<ProtocolMessage> extends
AutoCloseable {
void broadcast(ProtocolMessage msg, long timeout) throws
EventMeshException;
- void heartbeat() throws EventMeshException;
-
void listen() throws EventMeshException;
void subscribe(String topic, SubscriptionMode subscriptionMode,
SubscriptionType subscriptionType)
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
index bbe9a83..652c99a 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPPubClient.java
@@ -34,8 +34,6 @@ public interface EventMeshTCPPubClient<ProtocolMessage>
extends AutoCloseable {
void init() throws EventMeshException;
- void heartbeat() throws EventMeshException;
-
void reconnect() throws EventMeshException;
// todo: Hide package method, use ProtocolMessage
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
index 2118648..97cedd8 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshTCPSubClient.java
@@ -34,8 +34,6 @@ public interface EventMeshTCPSubClient<ProtocolMessage>
extends AutoCloseable {
void init() throws EventMeshException;
- void heartbeat() throws EventMeshException;
-
void reconnect() throws EventMeshException;
void subscribe(String topic, SubscriptionMode subscriptionMode,
SubscriptionType subscriptionType)
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
index eb29f17..3110dcc 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java
@@ -17,8 +17,6 @@
package org.apache.eventmesh.client.tcp.common;
-import org.apache.eventmesh.common.protocol.tcp.Package;
-
import io.netty.channel.ChannelHandlerContext;
/**
@@ -26,19 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
*
* @param <ProtocolMessage> receive message type.
*/
+@FunctionalInterface
public interface ReceiveMsgHook<ProtocolMessage> {
- void handle(Package msg, ChannelHandlerContext ctx);
+ void handle(ProtocolMessage msg, ChannelHandlerContext ctx);
- /**
- * Convert tcp package to protocolMessage.
- * <ul>
- * <li>{@link org.apache.eventmesh.common.EventMeshMessage} </li>
- * <li>{@link io.openmessaging.api.Message}</li>
- * <li>{@link io.cloudevents.CloudEvent}</li>
- * </ul>
- *
- * @param pkg
- * @return
- */
- ProtocolMessage convertToProtocolMessage(Package pkg);
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
index 61098ad..c3c7e65 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.client.tcp.common;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.codec.Codec;
@@ -26,6 +27,7 @@ import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -66,6 +68,8 @@ public abstract class TcpClient implements Closeable {
private Channel channel;
+ private ScheduledFuture<?> heartTask;
+
protected static final ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
new
ThreadFactoryBuilder().setNameFormat("TCPClientScheduler").setDaemon(true).build());
@@ -107,12 +111,33 @@ public abstract class TcpClient implements Closeable {
try {
channel.disconnect().sync();
workers.shutdownGracefully();
+ if (heartTask != null) {
+ heartTask.cancel(false);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("close tcp client failed.|remote address={}",
channel.remoteAddress(), e);
}
}
+ protected void heartbeat() {
+ if (heartTask != null) {
+ synchronized (TcpClient.class) {
+ heartTask = scheduler.scheduleAtFixedRate(() -> {
+ try {
+ if (!isActive()) {
+ reconnect();
+ }
+ Package msg = MessageUtils.heartBeat();
+ io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
+ } catch (Exception ignore) {
+ // ignore
+ }
+ }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
protected synchronized void reconnect() throws Exception {
ChannelFuture f = bootstrap.connect(host, port).sync();
channel = f.channel();
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
index 39faa72..abe0298 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPClient.java
@@ -68,12 +68,6 @@ public class CloudEventTCPClient implements
EventMeshTCPClient<CloudEvent> {
}
@Override
- public void heartbeat() throws EventMeshException {
- cloudEventTCPPubClient.heartbeat();
- cloudEventTCPSubClient.heartbeat();
- }
-
- @Override
public void listen() throws EventMeshException {
cloudEventTCPSubClient.listen();
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
index ca713ad..56dcfa5 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPPubClient.java
@@ -28,8 +28,11 @@ import
org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@@ -64,31 +67,13 @@ class CloudEventTCPPubClient extends TcpClient implements
EventMeshTCPPubClient<
try {
open(new Handler());
hello();
+ heartbeat();
} catch (Exception ex) {
throw new EventMeshException("Initialize
EventMeshMessageTCPPubClient error", ex);
}
}
@Override
- public void heartbeat() throws EventMeshException {
- if (task != null) {
- synchronized (CloudEventTCPPubClient.class) {
- task = scheduler.scheduleAtFixedRate(() -> {
- try {
- if (!isActive()) {
- reconnect();
- }
- Package msg = MessageUtils.heartBeat();
- io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- } catch (Exception ignore) {
- // ignore
- }
- }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
- }
- }
- }
-
- @Override
public void reconnect() throws EventMeshException {
try {
super.reconnect();
@@ -154,9 +139,6 @@ class CloudEventTCPPubClient extends TcpClient implements
EventMeshTCPPubClient<
@Override
public void close() {
try {
- if (task != null) {
- task.cancel(false);
- }
goodbye();
super.close();
} catch (Exception ex) {
@@ -173,10 +155,10 @@ class CloudEventTCPPubClient extends TcpClient implements
EventMeshTCPPubClient<
Command cmd = msg.getHeader().getCommand();
if (cmd == Command.RESPONSE_TO_CLIENT) {
+ Package pkg = responseToClientAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((CloudEvent) pkg.getBody(), ctx);
}
- Package pkg = MessageUtils.responseToClientAck(msg);
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
//TODO
@@ -201,4 +183,11 @@ class CloudEventTCPPubClient extends TcpClient implements
EventMeshTCPPubClient<
Package msg = MessageUtils.goodbye();
this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
}
+
+ private Package responseToClientAck(Package in) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.RESPONSE_TO_CLIENT_ACK, 0, null,
in.getHeader().getSeq()));
+ msg.setBody(JsonUtils.deserialize(in.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
index d19f663..9ea509b 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
@@ -29,18 +29,20 @@ import
org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
+import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@@ -54,7 +56,6 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
private final UserAgent userAgent;
private final List<SubscriptionItem> subscriptionItems =
Collections.synchronizedList(new ArrayList<>());
private ReceiveMsgHook<CloudEvent> callback;
- private ScheduledFuture<?> task;
public CloudEventTCPSubClient(EventMeshTCPClientConfig
eventMeshTcpClientConfig) {
super(eventMeshTcpClientConfig);
@@ -66,6 +67,7 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
try {
open(new Handler());
hello();
+ heartbeat();
log.info("SimpleSubClientImpl|{}|started!", clientNo);
} catch (Exception ex) {
throw new EventMeshException("Initialize
EventMeshMessageTcpSubClient error", ex);
@@ -73,28 +75,6 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
}
@Override
- public void heartbeat() throws EventMeshException {
- if (task == null) {
- synchronized (CloudEventTCPSubClient.class) {
- task = scheduler.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (!isActive()) {
- reconnect();
- }
- Package msg = MessageUtils.heartBeat();
- io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- } catch (Exception ignore) {
- //
- }
- }
- }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
- }
- }
- }
-
- @Override
public void reconnect() throws EventMeshException {
try {
super.reconnect();
@@ -161,7 +141,6 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
@Override
public void close() {
try {
- task.cancel(false);
goodbye();
super.close();
} catch (Exception ex) {
@@ -173,24 +152,24 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
@SuppressWarnings("Duplicates")
@Override
protected void channelRead0(ChannelHandlerContext ctx, Package msg)
throws Exception {
- Command cmd = msg.getHeader().getCommand();
+ Command cmd = msg.getHeader().getCmd();
log.info("|receive|type={}|msg={}", cmd, msg);
if (cmd == Command.REQUEST_TO_CLIENT) {
+ Package pkg = requestToClientAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((CloudEvent) pkg.getBody(), ctx);
}
- Package pkg = MessageUtils.requestToClientAck(msg);
send(pkg);
} else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
- Package pkg = MessageUtils.asyncMessageAck(msg);
+ Package pkg = asyncMessageAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((CloudEvent) msg, ctx);
}
send(pkg);
} else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
- Package pkg = MessageUtils.broadcastMessageAck(msg);
+ Package pkg = broadcastMessageAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((CloudEvent) msg, ctx);
}
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -208,4 +187,28 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
}
}
+ private Package requestToClientAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
+ // todo: Transform json to CloudEvents
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
+ private Package asyncMessageAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
+ // todo: Transform to CloudEvents
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
+ private Package broadcastMessageAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0,
null, tcpPackage.getHeader().getSeq()));
+ // todo: Transform to CloudEvents
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
index 0011435..6faf050 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPClient.java
@@ -41,8 +41,8 @@ public class EventMeshMessageTCPClient implements
EventMeshTCPClient<EventMeshMe
@Override
public void init() throws EventMeshException {
- eventMeshMessageTCPSubClient.init();
eventMeshMessageTCPPubClient.init();
+ eventMeshMessageTCPSubClient.init();
}
@Override
@@ -67,12 +67,6 @@ public class EventMeshMessageTCPClient implements
EventMeshTCPClient<EventMeshMe
}
@Override
- public void heartbeat() throws EventMeshException {
- eventMeshMessageTCPPubClient.heartbeat();
- eventMeshMessageTCPSubClient.heartbeat();
- }
-
- @Override
public void listen() throws EventMeshException {
eventMeshMessageTCPSubClient.listen();
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
index 990322c..fb7cd55 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPPubClient.java
@@ -34,7 +34,6 @@ import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -64,6 +63,7 @@ class EventMeshMessageTCPPubClient extends TcpClient
implements EventMeshTCPPubC
try {
open(new Handler());
hello();
+ heartbeat();
} catch (Exception ex) {
throw new EventMeshException("Initialize
EventMeshMessageTCPPubClient error", ex);
}
@@ -71,22 +71,6 @@ class EventMeshMessageTCPPubClient extends TcpClient
implements EventMeshTCPPubC
}
@Override
- public void heartbeat() throws EventMeshException {
- task = scheduler.scheduleAtFixedRate(() -> {
- try {
- if (!isActive()) {
- reconnect();
- }
- Package msg = MessageUtils.heartBeat();
- io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- log.debug("heartbeat to server from pub client|package {}",
msg);
- } catch (Exception ignore) {
- // ignore
- }
- }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
- }
-
- @Override
public void reconnect() throws EventMeshException {
try {
super.reconnect();
@@ -124,7 +108,6 @@ class EventMeshMessageTCPPubClient extends TcpClient
implements EventMeshTCPPubC
@Override
public Package publish(EventMeshMessage eventMeshMessage, long timeout)
throws EventMeshException {
try {
- // todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(eventMeshMessage,
Command.ASYNC_MESSAGE_TO_SERVER);
log.info("SimplePubClientImpl em
message|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
@@ -175,10 +158,11 @@ class EventMeshMessageTCPPubClient extends TcpClient
implements EventMeshTCPPubC
Command cmd = msg.getHeader().getCommand();
if (cmd == Command.RESPONSE_TO_CLIENT) {
+ // todo: Transform to CloudEvents
+ Package pkg = MessageUtils.responseToClientAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((EventMeshMessage) pkg.getBody(), ctx);
}
- Package pkg = MessageUtils.responseToClientAck(msg);
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
//TODO
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
index c1887b6..01b9b0d 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/eventmeshmessage/EventMeshMessageTCPSubClient.java
@@ -30,8 +30,10 @@ import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
+import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.CollectionUtils;
@@ -39,7 +41,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -63,6 +64,7 @@ class EventMeshMessageTCPSubClient extends TcpClient
implements EventMeshTCPSubC
try {
open(new Handler());
hello();
+ heartbeat();
log.info("SimpleSubClientImpl|{}|started!", clientNo);
} catch (Exception ex) {
throw new EventMeshException("Initialize
EventMeshMessageTcpSubClient error", ex);
@@ -70,25 +72,6 @@ class EventMeshMessageTCPSubClient extends TcpClient
implements EventMeshTCPSubC
}
@Override
- public void heartbeat() throws EventMeshException {
- task = scheduler.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- if (!isActive()) {
- reconnect();
- }
- Package msg = MessageUtils.heartBeat();
- io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
- log.debug("heartbeat to server from sub client|package
{}", msg);
- } catch (Exception ignore) {
- //
- }
- }
- }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT,
TimeUnit.MILLISECONDS);
- }
-
- @Override
public void reconnect() throws EventMeshException {
try {
super.reconnect();
@@ -170,21 +153,21 @@ class EventMeshMessageTCPSubClient extends TcpClient
implements EventMeshTCPSubC
Command cmd = msg.getHeader().getCommand();
log.info("|receive|type={}|msg={}", cmd, msg);
if (cmd == Command.REQUEST_TO_CLIENT) {
+ Package pkg = requestToClientAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((EventMeshMessage) pkg.getBody(), ctx);
}
- Package pkg = MessageUtils.requestToClientAck(msg);
send(pkg);
} else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
- Package pkg = MessageUtils.asyncMessageAck(msg);
+ Package pkg = asyncMessageAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((EventMeshMessage) pkg.getBody(), ctx);
}
send(pkg);
} else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
- Package pkg = MessageUtils.broadcastMessageAck(msg);
+ Package pkg = broadcastMessageAck(msg);
if (callback != null) {
- callback.handle(msg, ctx);
+ callback.handle((EventMeshMessage) pkg.getBody(), ctx);
}
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -202,4 +185,25 @@ class EventMeshMessageTCPSubClient extends TcpClient
implements EventMeshTCPSubC
}
}
+ private Package requestToClientAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
+ private Package asyncMessageAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
+ private Package broadcastMessageAck(Package tcpPackage) {
+ Package msg = new Package();
+ msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0,
null, tcpPackage.getHeader().getSeq()));
+ msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ return msg;
+ }
+
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
index 6381b6f..25a929e 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPClient.java
@@ -69,12 +69,6 @@ public class OpenMessageTCPClient implements
EventMeshTCPClient<Message> {
}
@Override
- public void heartbeat() throws EventMeshException {
- eventMeshTCPPubClient.heartbeat();
- eventMeshTCPSubClient.heartbeat();
- }
-
- @Override
public void listen() throws EventMeshException {
eventMeshTCPSubClient.listen();
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
index 7e3d40d..3eebdd2 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPPubClient.java
@@ -42,11 +42,6 @@ class OpenMessageTCPPubClient implements
EventMeshTCPPubClient<Message> {
}
@Override
- public void heartbeat() throws EventMeshException {
-
- }
-
- @Override
public void reconnect() throws EventMeshException {
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
index c0de8b1..34c3ad0 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/openmessage/OpenMessageTCPSubClient.java
@@ -42,11 +42,6 @@ class OpenMessageTCPSubClient implements
EventMeshTCPSubClient<Message> {
}
@Override
- public void heartbeat() throws EventMeshException {
-
- }
-
- @Override
public void reconnect() throws EventMeshException {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]