This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch cloudevents
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/cloudevents by this push:
new 5d2721f cloudevents protocol tcp pub/sub for sdk (#622)
5d2721f is described below
commit 5d2721ffe21e237a93f8ae7e71bcec52e64e75c8
Author: mike_xwm <[email protected]>
AuthorDate: Wed Dec 1 21:40:08 2021 +0800
cloudevents protocol tcp pub/sub for sdk (#622)
1.cloudevents protocol tcp pub/sub for sdk
---
build.gradle | 1 +
.../eventmesh/common/protocol/tcp/codec/Codec.java | 8 +++-
.../eventmesh/tcp/common/EventMeshTestUtils.java | 10 ++++-
.../eventmesh-protocol-cloudevents/build.gradle | 1 +
.../cloudevents/CloudEventsProtocolAdaptor.java | 6 ++-
.../http/SendMessageBatchProtocolResolver.java | 17 ++++++++
.../http/SendMessageBatchV2ProtocolResolver.java | 17 ++++++++
.../http/SendMessageRequestProtocolResolver.java | 17 ++++++++
.../resolver/tcp/TcpMessageProtocolResolver.java | 49 +++++++++++++++++-----
eventmesh-sdk-java/build.gradle | 1 +
.../eventmesh/client/tcp/common/MessageUtils.java | 13 +++++-
.../impl/cloudevent/CloudEventTCPSubClient.java | 26 ++++++++----
12 files changed, 141 insertions(+), 25 deletions(-)
diff --git a/build.gradle b/build.gradle
index 2aa26fb..260c26b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -450,6 +450,7 @@ subprojects {
dependency "org.powermock:powermock-api-mockito2:2.0.2"
dependency "io.cloudevents:cloudevents-core:2.2.0"
+ dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"
}
}
}
\ No newline at end of file
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index 191a099..9570100 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -76,7 +76,13 @@ public class Codec {
final String bodyJson = pkg != null ?
OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;
final byte[] headerData = serializeBytes(headerJson);
- final byte[] bodyData = serializeBytes(bodyJson);
+// final byte[] bodyData = serializeBytes(bodyJson);
+
+ byte[] bodyData = serializeBytes(bodyJson);
+
+ if (headerJson.contains("cloudevents")) {
+ bodyData = (byte[]) pkg.getBody();
+ }
if (log.isDebugEnabled()) {
log.debug("Encoder headerJson={}|bodyJson={}", headerJson,
bodyJson);
diff --git
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 4359832..8944482 100644
---
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -36,6 +36,8 @@ import org.apache.eventmesh.common.utils.JsonUtils;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -165,12 +167,16 @@ public class EventMeshTestUtils {
}
public static CloudEvent generateCloudEventV1() {
+ Map<String, String> content = new HashMap<>();
+ content.put("content", "testAsyncMessage");
+
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
- .withSubject(TOPIC_PRX_WQ2ClientBroadCast)
+ .withSubject(TOPIC_PRX_WQ2ClientUniCast)
.withSource(URI.create("/"))
+ .withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
- .withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8))
+
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", "30000")
.build();
return event;
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
index 4a82353..cf2b974 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
@@ -18,6 +18,7 @@
dependencies {
compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
implementation "io.cloudevents:cloudevents-core"
+ implementation "io.cloudevents:cloudevents-json-jackson"
testImplementation
project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
testImplementation "io.cloudevents:cloudevents-core"
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
index b670ef9..6e540e6 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
@@ -18,6 +18,8 @@
package org.apache.eventmesh.protocol.cloudevents;
import io.cloudevents.CloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
@@ -111,7 +113,9 @@ public class CloudEventsProtocolAdaptor<T extends
ProtocolTransportObject>
return httpCommand;
} else if (StringUtils.equals("tcp", protocolDesc)) {
Package pkg = new Package();
- pkg.setBody(cloudEvent);
+ byte[] bodyByte =
EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType())
+ .serialize(cloudEvent);
+ pkg.setBody(bodyByte);
return pkg;
} else {
throw new ProtocolHandleException(String.format("Unsupported
protocolDesc: %s", protocolDesc));
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
index 6ea8d26..2b4f180 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.eventmesh.protocol.cloudevents.resolver.http;
import io.cloudevents.CloudEvent;
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
index 2c33aec..a863bb5 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.eventmesh.protocol.cloudevents.resolver.http;
import io.cloudevents.CloudEvent;
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
index 7ef619a..6c80dd7 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.eventmesh.protocol.cloudevents.resolver.http;
import io.cloudevents.CloudEvent;
diff --git
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
index 0e2b303..bec9b8d 100644
---
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
+++
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
@@ -1,17 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.eventmesh.protocol.cloudevents.resolver.tcp;
-import io.cloudevents.CloudEvent;
-import io.cloudevents.SpecVersion;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;
+import org.apache.commons.lang3.StringUtils;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
public class TcpMessageProtocolResolver {
- public static CloudEvent buildEvent(Header header, String cloudEventJson)
throws ProtocolHandleException {
+ public static CloudEvent buildEvent(Header header, String cloudEventJson)
+ throws ProtocolHandleException {
CloudEventBuilder cloudEventBuilder;
String protocolType =
header.getProperty(Constants.PROTOCOL_TYPE).toString();
@@ -28,10 +55,11 @@ public class TcpMessageProtocolResolver {
if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME,
protocolType)) {
throw new ProtocolHandleException(String.format("Unsupported
protocolType: %s", protocolType));
}
- if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
- // todo: transform cloudEventJson to cloudEvent
- cloudEventBuilder = CloudEventBuilder.v1(null);
+ if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
+ CloudEvent event =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
+ StandardCharsets.UTF_8));
+ cloudEventBuilder = CloudEventBuilder.v1(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey,
header.getProperty(propKey).toString());
}
@@ -39,8 +67,9 @@ public class TcpMessageProtocolResolver {
return cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(),
protocolVersion)) {
- // todo: transform cloudEventJson to cloudEvent
- cloudEventBuilder = CloudEventBuilder.v03(null);
+ CloudEvent event =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
+ StandardCharsets.UTF_8));
+ cloudEventBuilder = CloudEventBuilder.v03(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey,
header.getProperty(propKey).toString());
diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle
index 108e418..a60dc14 100644
--- a/eventmesh-sdk-java/build.gradle
+++ b/eventmesh-sdk-java/build.gradle
@@ -27,6 +27,7 @@ dependencies {
// protocol
implementation "io.cloudevents:cloudevents-core"
+ implementation "io.cloudevents:cloudevents-json-jackson"
implementation "io.openmessaging:openmessaging-api"
compileOnly 'org.projectlombok:lombok:1.18.22'
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 16cbeef..d1020d4 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -27,13 +27,17 @@ import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
import io.openmessaging.api.Message;
public class MessageUtils {
@@ -91,9 +95,15 @@ public class MessageUtils {
if (message instanceof CloudEvent) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION,
((CloudEvent) message).getSpecVersion().toString());
+ msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
+ byte[] bodyByte =
EventFormatProvider.getInstance().resolveFormat(((CloudEvent)
message).getDataContentType())
+ .serialize((CloudEvent) message);
+ msg.setBody(bodyByte);
} else if (message instanceof EventMeshMessage) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION,
SpecVersion.V1.toString());
+ msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
+ msg.setBody(message);
} else if (message instanceof Message) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME);
// todo: this version need to be confirmed.
@@ -102,8 +112,7 @@ public class MessageUtils {
// unsupported protocol for server
throw new IllegalArgumentException("Unsupported message protocol");
}
- msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
- msg.setBody(message);
+
return msg;
}
diff --git
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
index 9ea509b..1f1f7a0 100644
---
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
+++
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -37,12 +38,15 @@ import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.commons.collections4.CollectionUtils;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
@@ -154,22 +158,29 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
protected void channelRead0(ChannelHandlerContext ctx, Package msg)
throws Exception {
Command cmd = msg.getHeader().getCmd();
log.info("|receive|type={}|msg={}", cmd, msg);
+ String protocolVersion =
msg.getHeader().getProperty(Constants.PROTOCOL_VERSION).toString();
if (cmd == Command.REQUEST_TO_CLIENT) {
Package pkg = requestToClientAck(msg);
if (callback != null) {
- callback.handle((CloudEvent) pkg.getBody(), ctx);
+ CloudEvent event =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+ callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
Package pkg = asyncMessageAck(msg);
if (callback != null) {
- callback.handle((CloudEvent) msg, ctx);
+ CloudEvent event =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+ callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
Package pkg = broadcastMessageAck(msg);
if (callback != null) {
- callback.handle((CloudEvent) msg, ctx);
+ CloudEvent event =
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+ callback.handle(event, ctx);
}
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -190,24 +201,21 @@ class CloudEventTCPSubClient extends TcpClient implements
EventMeshTCPSubClient<
private Package requestToClientAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
- // todo: Transform json to CloudEvents
- msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ msg.setBody(tcpPackage.getBody());
return msg;
}
private Package asyncMessageAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null,
tcpPackage.getHeader().getSeq()));
- // todo: Transform to CloudEvents
- msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ msg.setBody(tcpPackage.getBody());
return msg;
}
private Package broadcastMessageAck(Package tcpPackage) {
Package msg = new Package();
msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0,
null, tcpPackage.getHeader().getSeq()));
- // todo: Transform to CloudEvents
- msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(),
EventMeshMessage.class));
+ msg.setBody(tcpPackage.getBody());
return msg;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]