This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch cloudevents
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/cloudevents by this push:
     new 5d2721f  cloudevents protocol tcp pub/sub for sdk (#622)
5d2721f is described below

commit 5d2721ffe21e237a93f8ae7e71bcec52e64e75c8
Author: mike_xwm <[email protected]>
AuthorDate: Wed Dec 1 21:40:08 2021 +0800

    cloudevents protocol tcp pub/sub for sdk (#622)
    
    1.cloudevents protocol tcp pub/sub for sdk
---
 build.gradle                                       |  1 +
 .../eventmesh/common/protocol/tcp/codec/Codec.java |  8 +++-
 .../eventmesh/tcp/common/EventMeshTestUtils.java   | 10 ++++-
 .../eventmesh-protocol-cloudevents/build.gradle    |  1 +
 .../cloudevents/CloudEventsProtocolAdaptor.java    |  6 ++-
 .../http/SendMessageBatchProtocolResolver.java     | 17 ++++++++
 .../http/SendMessageBatchV2ProtocolResolver.java   | 17 ++++++++
 .../http/SendMessageRequestProtocolResolver.java   | 17 ++++++++
 .../resolver/tcp/TcpMessageProtocolResolver.java   | 49 +++++++++++++++++-----
 eventmesh-sdk-java/build.gradle                    |  1 +
 .../eventmesh/client/tcp/common/MessageUtils.java  | 13 +++++-
 .../impl/cloudevent/CloudEventTCPSubClient.java    | 26 ++++++++----
 12 files changed, 141 insertions(+), 25 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2aa26fb..260c26b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -450,6 +450,7 @@ subprojects {
             dependency "org.powermock:powermock-api-mockito2:2.0.2"
 
             dependency "io.cloudevents:cloudevents-core:2.2.0"
+            dependency "io.cloudevents:cloudevents-json-jackson:2.2.0"
         }
     }
 }
\ No newline at end of file
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
index 191a099..9570100 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/codec/Codec.java
@@ -76,7 +76,13 @@ public class Codec {
             final String bodyJson = pkg != null ? 
OBJECT_MAPPER.writeValueAsString(pkg.getBody()) : null;
 
             final byte[] headerData = serializeBytes(headerJson);
-            final byte[] bodyData = serializeBytes(bodyJson);
+//            final byte[] bodyData = serializeBytes(bodyJson);
+
+            byte[] bodyData = serializeBytes(bodyJson);
+
+            if (headerJson.contains("cloudevents")) {
+                bodyData = (byte[]) pkg.getBody();
+            }
 
             if (log.isDebugEnabled()) {
                 log.debug("Encoder headerJson={}|bodyJson={}", headerJson, 
bodyJson);
diff --git 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
index 4359832..8944482 100644
--- 
a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
+++ 
b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/common/EventMeshTestUtils.java
@@ -36,6 +36,8 @@ import org.apache.eventmesh.common.utils.JsonUtils;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -165,12 +167,16 @@ public class EventMeshTestUtils {
     }
 
     public static CloudEvent generateCloudEventV1() {
+        Map<String, String> content = new HashMap<>();
+        content.put("content", "testAsyncMessage");
+
         CloudEvent event = CloudEventBuilder.v1()
             .withId(UUID.randomUUID().toString())
-            .withSubject(TOPIC_PRX_WQ2ClientBroadCast)
+            .withSubject(TOPIC_PRX_WQ2ClientUniCast)
             .withSource(URI.create("/"))
+            .withDataContentType("application/cloudevents+json")
             .withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
-            .withData("testAsyncMessage".getBytes(StandardCharsets.UTF_8))
+            
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
             .withExtension("ttl", "30000")
             .build();
         return event;
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
index 4a82353..cf2b974 100644
--- a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
+++ b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/build.gradle
@@ -18,6 +18,7 @@
 dependencies {
     compileOnly project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
     implementation "io.cloudevents:cloudevents-core"
+    implementation "io.cloudevents:cloudevents-json-jackson"
 
     testImplementation 
project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
     testImplementation "io.cloudevents:cloudevents-core"
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
index b670ef9..6e540e6 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/CloudEventsProtocolAdaptor.java
@@ -18,6 +18,8 @@
 package org.apache.eventmesh.protocol.cloudevents;
 
 import io.cloudevents.CloudEvent;
+import io.cloudevents.core.provider.EventFormatProvider;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.ProtocolTransportObject;
@@ -111,7 +113,9 @@ public class CloudEventsProtocolAdaptor<T extends 
ProtocolTransportObject>
             return httpCommand;
         } else if (StringUtils.equals("tcp", protocolDesc)) {
             Package pkg = new Package();
-            pkg.setBody(cloudEvent);
+            byte[] bodyByte = 
EventFormatProvider.getInstance().resolveFormat(cloudEvent.getDataContentType())
+                .serialize(cloudEvent);
+            pkg.setBody(bodyByte);
             return pkg;
         } else {
             throw new ProtocolHandleException(String.format("Unsupported 
protocolDesc: %s", protocolDesc));
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
index 6ea8d26..2b4f180 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.eventmesh.protocol.cloudevents.resolver.http;
 
 import io.cloudevents.CloudEvent;
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
index 2c33aec..a863bb5 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageBatchV2ProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.eventmesh.protocol.cloudevents.resolver.http;
 
 import io.cloudevents.CloudEvent;
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
index 7ef619a..6c80dd7 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/http/SendMessageRequestProtocolResolver.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.eventmesh.protocol.cloudevents.resolver.http;
 
 import io.cloudevents.CloudEvent;
diff --git 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
index 0e2b303..bec9b8d 100644
--- 
a/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
+++ 
b/eventmesh-protocol-plugin/eventmesh-protocol-cloudevents/src/main/java/org/apache/eventmesh/protocol/cloudevents/resolver/tcp/TcpMessageProtocolResolver.java
@@ -1,17 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.eventmesh.protocol.cloudevents.resolver.tcp;
 
-import io.cloudevents.CloudEvent;
-import io.cloudevents.SpecVersion;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.protocol.tcp.Header;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
 import org.apache.eventmesh.protocol.cloudevents.CloudEventsProtocolConstant;
 
+import org.apache.commons.lang3.StringUtils;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.SpecVersion;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
+
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
 public class TcpMessageProtocolResolver {
 
-    public static CloudEvent buildEvent(Header header, String cloudEventJson) 
throws ProtocolHandleException {
+    public static CloudEvent buildEvent(Header header, String cloudEventJson)
+        throws ProtocolHandleException {
         CloudEventBuilder cloudEventBuilder;
 
         String protocolType = 
header.getProperty(Constants.PROTOCOL_TYPE).toString();
@@ -28,10 +55,11 @@ public class TcpMessageProtocolResolver {
         if (!StringUtils.equals(CloudEventsProtocolConstant.PROTOCOL_NAME, 
protocolType)) {
             throw new ProtocolHandleException(String.format("Unsupported 
protocolType: %s", protocolType));
         }
-        if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
-            // todo: transform cloudEventJson to cloudEvent
-            cloudEventBuilder = CloudEventBuilder.v1(null);
 
+        if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
+            CloudEvent event = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
+                StandardCharsets.UTF_8));
+            cloudEventBuilder = CloudEventBuilder.v1(event);
             for (String propKey : header.getProperties().keySet()) {
                 cloudEventBuilder.withExtension(propKey, 
header.getProperty(propKey).toString());
             }
@@ -39,8 +67,9 @@ public class TcpMessageProtocolResolver {
             return cloudEventBuilder.build();
 
         } else if (StringUtils.equals(SpecVersion.V03.toString(), 
protocolVersion)) {
-            // todo: transform cloudEventJson to cloudEvent
-            cloudEventBuilder = CloudEventBuilder.v03(null);
+            CloudEvent event = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).deserialize(cloudEventJson.getBytes(
+                StandardCharsets.UTF_8));
+            cloudEventBuilder = CloudEventBuilder.v03(event);
 
             for (String propKey : header.getProperties().keySet()) {
                 cloudEventBuilder.withExtension(propKey, 
header.getProperty(propKey).toString());
diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle
index 108e418..a60dc14 100644
--- a/eventmesh-sdk-java/build.gradle
+++ b/eventmesh-sdk-java/build.gradle
@@ -27,6 +27,7 @@ dependencies {
 
     // protocol
     implementation "io.cloudevents:cloudevents-core"
+    implementation "io.cloudevents:cloudevents-json-jackson"
     implementation "io.openmessaging:openmessaging-api"
 
     compileOnly 'org.projectlombok:lombok:1.18.22'
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
index 16cbeef..d1020d4 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java
@@ -27,13 +27,17 @@ import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.Subscription;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.common.utils.JsonUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 import io.cloudevents.CloudEvent;
 import io.cloudevents.SpecVersion;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
 import io.openmessaging.api.Message;
 
 public class MessageUtils {
@@ -91,9 +95,15 @@ public class MessageUtils {
         if (message instanceof CloudEvent) {
             msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, 
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
             msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, 
((CloudEvent) message).getSpecVersion().toString());
+            msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
+            byte[] bodyByte = 
EventFormatProvider.getInstance().resolveFormat(((CloudEvent) 
message).getDataContentType())
+                .serialize((CloudEvent) message);
+            msg.setBody(bodyByte);
         } else if (message instanceof EventMeshMessage) {
             msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, 
EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
             msg.getHeader().putProperty(Constants.PROTOCOL_VERSION, 
SpecVersion.V1.toString());
+            msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
+            msg.setBody(message);
         } else if (message instanceof Message) {
             msg.getHeader().putProperty(Constants.PROTOCOL_TYPE, 
EventMeshCommon.OPEN_MESSAGE_PROTOCOL_NAME);
             // todo: this version need to be confirmed.
@@ -102,8 +112,7 @@ public class MessageUtils {
             // unsupported protocol for server
             throw new IllegalArgumentException("Unsupported message protocol");
         }
-        msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
-        msg.setBody(message);
+
         return msg;
     }
 
diff --git 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
index 9ea509b..1f1f7a0 100644
--- 
a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
+++ 
b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/cloudevent/CloudEventTCPSubClient.java
@@ -24,6 +24,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
 import org.apache.eventmesh.client.tcp.common.RequestContext;
 import org.apache.eventmesh.client.tcp.common.TcpClient;
 import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
+import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.SubscriptionMode;
@@ -37,12 +38,15 @@ import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.provider.EventFormatProvider;
+import io.cloudevents.jackson.JsonFormat;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import lombok.extern.slf4j.Slf4j;
@@ -154,22 +158,29 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
         protected void channelRead0(ChannelHandlerContext ctx, Package msg) 
throws Exception {
             Command cmd = msg.getHeader().getCmd();
             log.info("|receive|type={}|msg={}", cmd, msg);
+            String protocolVersion = 
msg.getHeader().getProperty(Constants.PROTOCOL_VERSION).toString();
             if (cmd == Command.REQUEST_TO_CLIENT) {
                 Package pkg = requestToClientAck(msg);
                 if (callback != null) {
-                    callback.handle((CloudEvent) pkg.getBody(), ctx);
+                    CloudEvent event = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+                        
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+                    callback.handle(event, ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.ASYNC_MESSAGE_TO_CLIENT) {
                 Package pkg = asyncMessageAck(msg);
                 if (callback != null) {
-                    callback.handle((CloudEvent) msg, ctx);
+                    CloudEvent event = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+                        
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+                    callback.handle(event, ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.BROADCAST_MESSAGE_TO_CLIENT) {
                 Package pkg = broadcastMessageAck(msg);
                 if (callback != null) {
-                    callback.handle((CloudEvent) msg, ctx);
+                    CloudEvent event = 
EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE)
+                        
.deserialize(msg.getBody().toString().getBytes(StandardCharsets.UTF_8));
+                    callback.handle(event, ctx);
                 }
                 send(pkg);
             } else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
@@ -190,24 +201,21 @@ class CloudEventTCPSubClient extends TcpClient implements 
EventMeshTCPSubClient<
     private Package requestToClientAck(Package tcpPackage) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.REQUEST_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
-        // todo: Transform json to CloudEvents
-        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        msg.setBody(tcpPackage.getBody());
         return msg;
     }
 
     private Package asyncMessageAck(Package tcpPackage) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, 0, null, 
tcpPackage.getHeader().getSeq()));
-        // todo: Transform to CloudEvents
-        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        msg.setBody(tcpPackage.getBody());
         return msg;
     }
 
     private Package broadcastMessageAck(Package tcpPackage) {
         Package msg = new Package();
         msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, 
null, tcpPackage.getHeader().getSeq()));
-        // todo: Transform to CloudEvents
-        msg.setBody(JsonUtils.deserialize(tcpPackage.getBody().toString(), 
EventMeshMessage.class));
+        msg.setBody(tcpPackage.getBody());
         return msg;
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to