This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to refs/heads/develop by this push:
     new e48987d  Feature/psa websocket improvements (#125)
e48987d is described below

commit e48987dee443d1571fdbab7bd96c35cab056c79f
Author: dhbfischer <[email protected]>
AuthorDate: Tue Oct 22 17:01:07 2019 +0200

    Feature/psa websocket improvements (#125)
    
    * Improved pubsub websocket with json header and added publisher example 
with webpage
    
    * Added missing publisher websocket directory
---
 bundles/pubsub/examples/CMakeLists.txt             |  20 ++
 bundles/pubsub/examples/pubsub/CMakeLists.txt      |   1 +
 .../examples/pubsub/publisher2/CMakeLists.txt      |   1 -
 .../CMakeLists.txt                                 |  27 +-
 .../publisher_websocket/resources/index.html       |  40 +++
 .../pubsub/publisher_websocket/resources/script.js |  38 +++
 .../src/pubsub_websocket_common.h                  |  13 +-
 .../src/pubsub_websocket_topic_receiver.c          | 306 ++++++---------------
 .../src/pubsub_websocket_topic_sender.c            | 162 ++---------
 9 files changed, 231 insertions(+), 377 deletions(-)

diff --git a/bundles/pubsub/examples/CMakeLists.txt 
b/bundles/pubsub/examples/CMakeLists.txt
index ec7e5a9..a36e87d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -391,3 +391,23 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
     endif ()
 
 endif()
+
+add_celix_container(pubsub_publisher_websocket
+    GROUP pubsub
+    BUNDLES
+        Celix::log_service
+        Celix::shell
+        Celix::shell_tui
+        Celix::http_admin
+        Celix::pubsub_serializer_json
+        Celix::pubsub_topology_manager
+        Celix::pubsub_admin_websocket
+        celix_pubsub_websocket_publisher
+    PROPERTIES
+        PSA_TCP_VERBOSE=true
+        PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+        PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+        CELIX_HTTP_ADMIN_LISTENING_PORTS=7660
+        CELIX_HTTP_ADMIN_NUM_THREADS=5
+)
+target_link_libraries(pubsub_publisher_websocket PRIVATE 
${PUBSUB_CONTAINER_LIBS})
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 73b3e52..5bfb06f 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -19,6 +19,7 @@ include_directories("common/include")
 
 add_subdirectory(publisher)
 add_subdirectory(publisher2)
+add_subdirectory(publisher_websocket)
 add_subdirectory(subscriber)
 
 
diff --git a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
index a29968b..3927022 100644
--- a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@ -48,4 +48,3 @@ celix_bundle_files(celix_pubsub_poi_publisher2
     DESTINATION "META-INF/keys/subscriber"
 )
 
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework)
diff --git a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt 
b/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
similarity index 66%
copy from bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
index a29968b..d4d5290 100644
--- a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
@@ -15,37 +15,46 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_celix_bundle(celix_pubsub_poi_publisher2
-    SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
+
+find_package(CURL REQUIRED)
+find_package(OpenSSL REQUIRED)
+
+
+add_celix_bundle(celix_pubsub_websocket_publisher
+    SYMBOLIC_NAME "apache_celix_pubsub_websocket_publisher"
     VERSION "1.0.0"
     SOURCES 
         ../publisher/private/src/ps_pub_activator.c
         ../publisher/private/src/pubsub_publisher.c
 )
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework 
Celix::pubsub_api)
-target_include_directories(celix_pubsub_poi_publisher2 PRIVATE 
../publisher/private/include)
+target_link_libraries(celix_pubsub_websocket_publisher PRIVATE 
Celix::framework Celix::pubsub_api ${CURL_LIBRARIES} ${OPENSSL_LIBRARIES})
+target_include_directories(celix_pubsub_websocket_publisher PRIVATE 
../publisher/private/include)
 
 
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
     
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
     
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
     DESTINATION "META-INF/descriptors"
 )
 
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
         
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
         
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
     DESTINATION "META-INF/topics/pub"
 )
 
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
         ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
     DESTINATION "META-INF/keys"
 )
 
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
     ${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
     DESTINATION "META-INF/keys/subscriber"
 )
 
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework)
+celix_bundle_add_dir(celix_pubsub_websocket_publisher resources DESTINATION 
".")
+
+celix_bundle_headers(celix_pubsub_websocket_publisher
+    "X-Web-Resource: /example$<SEMICOLON>/resources"
+)
diff --git 
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html 
b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
new file mode 100644
index 0000000..e1ecb3d
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
@@ -0,0 +1,40 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="utf-8"/>
+    <title>Apache Celix Websocket publisher example</title>
+    <script src="script.js"></script>
+</head>
+<body>
+    <div>
+        <h1>Poi1 message:</h1>
+        <br/>
+        <p id="receivePoi1"></p>
+    </div>
+    <br/>
+    <div>
+        <h1>Poi2 message:</h1>
+        <br/>
+        <p id="receivePoi2"></p>
+    </div>
+    <script>docReady();</script>
+</body>
+</html>
diff --git 
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js 
b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
new file mode 100644
index 0000000..81026a4
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+function docReady() {
+    var host = window.location.host;
+    var shellSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
+    var shellSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+
+    shellSocketP1.onmessage = function (event) {
+        console.log(event);
+        var obj = JSON.parse(event.data);
+        document.getElementById("receivePoi1").innerHTML = "Received " + 
obj.id + " message with sequence nr: "
+            + obj.seqNr + "<br/>latitude = " + 
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " + 
JSON.stringify(obj.data.location.lon);
+    };
+
+    shellSocketP2.onmessage = function (event) {
+        console.log(event);
+        var obj = JSON.parse(event.data);
+        document.getElementById("receivePoi2").innerHTML = "Received " + 
obj.id + " message with sequence nr: "
+            + obj.seqNr + "<br/>latitude = " + 
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " + 
JSON.stringify(obj.data.location.lon);
+    };
+}
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
index 9f918f6..4c22319 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -27,25 +27,14 @@
 
 
 struct pubsub_websocket_msg_header {
-    uint32_t type; //msg type id (hash of fqn)
+    const char *id; //FQN
     uint8_t major;
     uint8_t minor;
     uint32_t seqNr;
-    unsigned char originUUID[16];
-    uint64_t sendtimeSeconds; //seconds since epoch
-    uint64_t sendTimeNanoseconds; //ns since epoch
 };
 
 typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t;
 
-struct pubsub_websocket_msg {
-    pubsub_websocket_msg_header_t header;
-    unsigned int payloadSize;
-    char payload[];
-};
-
-typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
-
 void psa_websocket_setScopeAndTopicFilter(const char* scope, const char 
*topic, char *filter);
 char *psa_websocket_createURI(const char *scope, const char *topic);
 
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 1c9e4a0..1c90f3c 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -33,8 +33,8 @@
 #include "pubsub_websocket_common.h"
 
 #include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
 #include <http_admin/api.h>
+#include <jansson.h>
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN 37
@@ -52,10 +52,14 @@
 
 typedef struct pubsub_websocket_rcv_buffer {
     celix_thread_mutex_t mutex;
-    celix_array_list_t *list;     //List of received websocket messages (type: 
pubsub_websocket_msg_t *)
-    celix_array_list_t *rcvTimes; //Corresponding receive times of the 
received websocket messages (rcvTimes[i] -> list[i])
+    celix_array_list_t *list;     //List of received websocket messages (type: 
pubsub_websocket_msg_entry_t *)
 } pubsub_websocket_rcv_buffer_t;
 
+typedef struct pubsub_websocket_msg_entry {
+    size_t msgSize;
+    const char *msgData;
+} pubsub_websocket_msg_entry_t;
+
 struct pubsub_websocket_topic_receiver {
     celix_bundle_context_t *ctx;
     log_helper_t *logHelper;
@@ -65,7 +69,6 @@ struct pubsub_websocket_topic_receiver {
     char *topic;
     char scopeAndTopicFilter[5];
     char *uri;
-    bool metricsEnabled;
 
     pubsub_websocket_rcv_buffer_t recvBuffer;
 
@@ -101,26 +104,9 @@ typedef struct psa_websocket_requested_connection_entry {
     bool statically; //true if the connection is statically configured through 
the topic properties.
 } psa_websocket_requested_connection_entry_t;
 
-typedef struct psa_websocket_subscriber_metrics_entry_t {
-    unsigned int msgTypeId;
-    uuid_t origin;
-
-    unsigned long nrOfMessagesReceived;
-    unsigned long nrOfSerializationErrors;
-    struct timespec lastMessageReceived;
-    double averageTimeBetweenMessagesInSeconds;
-    double averageSerializationTimeInSeconds;
-    double averageDelayInSeconds;
-    double maxDelayInSeconds;
-    double minDelayInSeconds;
-    unsigned int lastSeqNr;
-    unsigned long nrOfMissingSeqNumbers;
-} psa_websocket_subscriber_metrics_entry_t;
-
 typedef struct psa_websocket_subscriber_entry {
     int usageCount;
-    hash_map_t *msgTypes; //map from serializer svc
-    hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin 
uuid, value = psa_websocket_subscriber_metrics_entry_t*
+    hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
     pubsub_subscriber_t *svc;
     bool initialized; //true if the init function is called through the 
receive thread
 } psa_websocket_subscriber_entry_t;
@@ -131,6 +117,7 @@ static void 
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
 static void* psa_websocket_recvThread(void * data);
 static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
 *receiver);
 static void 
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t 
*receiver);
+static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t 
*msg_type_id_map);
 
 static int psa_websocketTopicReceiver_data(struct mg_connection *connection, 
int op_code, char *data, size_t length, void *handle);
 static void psa_websocketTopicReceiver_close(const struct mg_connection 
*connection, void *handle);
@@ -151,7 +138,6 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
     receiver->scope = strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     psa_websocket_setScopeAndTopicFilter(scope, topic, 
receiver->scopeAndTopicFilter);
-    receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
 
     receiver->uri = psa_websocket_createURI(scope, topic);
 
@@ -164,7 +150,6 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
         receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
         receiver->requestedConnections.map = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
         arrayList_create(&receiver->recvBuffer.list);
-        arrayList_create(&receiver->recvBuffer.rcvTimes);
     }
 
     //track subscribers
@@ -259,13 +244,6 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
             if (entry != NULL)  {
-                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->metrics);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-                    hashMap_destroy(origins, true, true);
-                }
-                hashMap_destroy(entry->metrics, false, false);
-
                 
receiver->serializer->destroySerializerMap(receiver->serializer->handle, 
entry->msgTypes);
                 free(entry);
             }
@@ -300,20 +278,13 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         celixThreadMutex_destroy(&receiver->recvBuffer.mutex);
         int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list);
         while(msgBufSize > 0) {
-            pubsub_websocket_msg_t *msg = 
celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+            pubsub_websocket_msg_entry_t *msg = 
celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+            free((void *) msg->msgData);
             free(msg);
             msgBufSize--;
         }
         celix_arrayList_destroy(receiver->recvBuffer.list);
 
-        int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes);
-        while(rcvTimesSize > 0) {
-            struct timespec *time = 
celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1);
-            free(time);
-            rcvTimesSize--;
-        }
-        celix_arrayList_destroy(receiver->recvBuffer.rcvTimes);
-
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
@@ -416,16 +387,6 @@ static void 
pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
         int rc = 
receiver->serializer->createSerializerMap(receiver->serializer->handle, 
(celix_bundle_t*)bnd, &entry->msgTypes);
 
         if (rc == 0) {
-            entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
-            hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgTypes);
-            while (hashMapIterator_hasNext(&iter)) {
-                pubsub_msg_serializer_t *msgSer = 
hashMapIterator_nextValue(&iter);
-                hash_map_t *origins = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-                hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId, 
origins);
-            }
-        }
-
-        if (rc == 0) {
             hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
         } else {
             L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
@@ -452,118 +413,98 @@ static void 
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
         if (rc != 0) {
             L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for 
TopicReceiver %s/%s", receiver->scope, receiver->topic);
         }
-        hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_t *origins = hashMapIterator_nextValue(&iter);
-            hashMap_destroy(origins, true, true);
-        }
-        hashMap_destroy(entry->metrics, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t 
*hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) {
+static void * psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t 
*msg_type_id_map) {
+    void *msgTypeId = NULL;
+    if(fqn != NULL && msg_type_id_map != NULL) {
+        hash_map_iterator_t iter = hashMapIterator_construct(msg_type_id_map);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
+            pubsub_msg_serializer_t *serializer = hashMapEntry_getValue(entry);
+            if(strcmp(serializer->msgName, fqn) == 0) {
+                msgTypeId =  hashMapEntry_getKey(entry);
+                return msgTypeId;
+            }
+        }
+    }
+
+    return msgTypeId;
+}
+
+static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, 
const char* payload, size_t payloadSize) {
     //NOTE receiver->subscribers.mutex locked
-    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, 
(void*)(uintptr_t)(hdr->type));
+    void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id, 
entry->msgTypes);
+    pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
     pubsub_subscriber_t *svc = entry->svc;
-    bool monitor = receiver->metricsEnabled;
-
-    //monitoring
-    struct timespec beginSer;
-    struct timespec endSer;
-    int updateReceiveCount = 0;
-    int updateSerError = 0;
 
-    if (msgSer!= NULL) {
+    if (msgSer!= NULL && msgTypeId != 0) {
         void *deserializedMsg = NULL;
         bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion, 
hdr);
         if (validVersion) {
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &beginSer);
-            }
             celix_status_t status = msgSer->deserialize(msgSer->handle, 
payload, payloadSize, &deserializedMsg);
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &endSer);
-            }
+
             if (status == CELIX_SUCCESS) {
                 bool release = true;
                 svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, 
deserializedMsg, &release);
                 if (release) {
                     msgSer->freeMsg(msgSer->handle, deserializedMsg);
                 }
-                updateReceiveCount += 1;
             } else {
-                updateSerError += 1;
                 L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
             }
         }
     } else {
-        L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X", 
hdr->type);
-    }
-
-    if (msgSer != NULL && monitor) {
-        hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t 
)hdr->type);
-        char uuidStr[UUID_STR_LEN+1];
-        uuid_unparse(hdr->originUUID, uuidStr);
-        psa_websocket_subscriber_metrics_entry_t *metrics = 
hashMap_get(origins, uuidStr);
-
-        if (metrics == NULL) {
-            metrics = calloc(1, sizeof(*metrics));
-            hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
-            uuid_copy(metrics->origin, hdr->originUUID);
-            metrics->msgTypeId = hdr->type;
-            metrics->maxDelayInSeconds = -INFINITY;
-            metrics->minDelayInSeconds = INFINITY;
-            metrics->lastSeqNr = 0;
-        }
-
-        double diff = celix_difftime(&beginSer, &endSer);
-        long n = metrics->nrOfMessagesReceived;
-        metrics->averageSerializationTimeInSeconds = 
(metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-
-        diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
-        n = metrics->nrOfMessagesReceived;
-        if (metrics->nrOfMessagesReceived >= 1) {
-            metrics->averageTimeBetweenMessagesInSeconds = 
(metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
-        }
-        metrics->lastMessageReceived = *receiveTime;
-
-
-        int incr = hdr->seqNr - metrics->lastSeqNr;
-        if (metrics->lastSeqNr >0 && incr > 1) {
-            metrics->nrOfMissingSeqNumbers += (incr - 1);
-            L_WARN("Missing message seq nr went from %i to %i", 
metrics->lastSeqNr, hdr->seqNr);
-        }
-        metrics->lastSeqNr = hdr->seqNr;
-
-        struct timespec sendTime;
-        sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
-        sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the 
tv_nsec is not correct
-        diff = celix_difftime(&sendTime, receiveTime);
-        metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n + 
diff) / (n+1);
-        if (diff < metrics->minDelayInSeconds) {
-            metrics->minDelayInSeconds = diff;
-        }
-        if (diff > metrics->maxDelayInSeconds) {
-            metrics->maxDelayInSeconds = diff;
-        }
-
-        metrics->nrOfMessagesReceived += updateReceiveCount;
-        metrics->nrOfSerializationErrors += updateSerError;
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X, 
fqn %s", msgTypeId, hdr->id);
     }
 }
 
-static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, 
const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t 
payloadSize, struct timespec *receiveTime) {
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        if (entry != NULL) {
-            processMsgForSubscriberEntry(receiver, entry, hdr, payload, 
payloadSize, receiveTime);
+static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, 
const char *msg, size_t msgSize) {
+    json_error_t error;
+    json_t *jsMsg = json_loadb(msg, msgSize, 0, &error);
+    if(jsMsg != NULL) {
+        json_t *jsId = json_object_get(jsMsg, "id");
+        json_t *jsMajor = json_object_get(jsMsg, "major");
+        json_t *jsMinor = json_object_get(jsMsg, "minor");
+        json_t *jsSeqNr = json_object_get(jsMsg, "seqNr");
+        json_t *jsData = json_object_get(jsMsg, "data");
+
+        if (jsId && jsMajor && jsMinor && jsSeqNr && jsData) {
+            pubsub_websocket_msg_header_t hdr;
+            hdr.id = json_string_value(jsId);
+            hdr.major = (uint8_t) json_integer_value(jsMajor);
+            hdr.minor = (uint8_t) json_integer_value(jsMinor);
+            hdr.seqNr = (uint32_t) json_integer_value(jsSeqNr);
+            const char *payload = json_dumps(jsData, 0);
+            size_t payloadSize = strlen(payload);
+            printf("Received msg: id %s\tmajor %u\tminor %u\tseqNr %u\tdata 
%s\n", hdr.id, hdr.major, hdr.minor, hdr.seqNr, payload);
+
+            celixThreadMutex_lock(&receiver->subscribers.mutex);
+            hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
+            while (hashMapIterator_hasNext(&iter)) {
+                psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+                if (entry != NULL) {
+                    processMsgForSubscriberEntry(receiver, entry, &hdr, 
payload, payloadSize);
+                }
+            }
+            celixThreadMutex_unlock(&receiver->subscribers.mutex);
+            free((void *) hdr.id);
+            free((void *) payload);
+        } else {
+            L_WARN("[PSA_WEBSOCKET_TR] Received unsupported message: "
+                   "ID = %s, major = %d, minor = %d, seqNr = %d, data valid? 
%s",
+                   (jsId ? json_string_value(jsId) : "ERROR"),
+                   json_integer_value(jsMajor), json_integer_value(jsMinor),
+                   json_integer_value(jsSeqNr), (jsData ? "TRUE" : "FALSE"));
         }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_TR] Failed to load websocket JSON message, 
error: %s", error.line, error.text);
+        return;
     }
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
 }
 
 static void* psa_websocket_recvThread(void * data) {
@@ -592,18 +533,13 @@ static void* psa_websocket_recvThread(void * data) {
 
         while(celix_arrayList_size(receiver->recvBuffer.list) > 0) {
             celixThreadMutex_lock(&receiver->recvBuffer.mutex);
-            pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *) 
celix_arrayList_get(receiver->recvBuffer.list, 0);
-            struct timespec *rcvTime = (struct timespec *) 
celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0);
+            pubsub_websocket_msg_entry_t *msg = (pubsub_websocket_msg_entry_t 
*) celix_arrayList_get(receiver->recvBuffer.list, 0);
+            celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
             celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
 
-            processMsg(receiver, &msg->header, msg->payload, msg->payloadSize, 
rcvTime);
+            processMsg(receiver, msg->msgData, msg->msgSize);
+            free((void *)msg->msgData);
             free(msg);
-            free(rcvTime);
-
-            celixThreadMutex_lock(&receiver->recvBuffer.mutex);
-            celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
-            celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0);
-            celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
         }
 
         celixThreadMutex_lock(&receiver->recvThread.mutex);
@@ -631,21 +567,14 @@ static int psa_websocketTopicReceiver_data(struct 
mg_connection *connection __at
     if (handle != NULL) {
         psa_websocket_requested_connection_entry_t *entry = 
(psa_websocket_requested_connection_entry_t *) handle;
 
-        pubsub_websocket_msg_t *rcvdMsg = malloc(length);
-        memcpy(rcvdMsg, data, length);
-
-        //Check if payload is completely received
-        unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) - 
sizeof(rcvdMsg->payloadSize);
-        if(rcvdMsg->payloadSize == rcvdPayloadSize) {
-            celixThreadMutex_lock(&entry->recvBuffer->mutex);
-            celix_arrayList_add(entry->recvBuffer->list, rcvdMsg);
-            struct timespec *receiveTime = malloc(sizeof(*receiveTime));
-            clock_gettime(CLOCK_REALTIME, receiveTime);
-            celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime);
-            celixThreadMutex_unlock(&entry->recvBuffer->mutex);
-        } else {
-            free(rcvdMsg);
-        }
+        celixThreadMutex_lock(&entry->recvBuffer->mutex);
+        pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
+        const char *rcvdMsgData = malloc(length);
+        memcpy((void *) rcvdMsgData, data, length);
+        msg->msgData = rcvdMsgData;
+        msg->msgSize = length;
+        celix_arrayList_add(entry->recvBuffer->list, msg);
+        celixThreadMutex_unlock(&entry->recvBuffer->mutex);
     }
 
     return 1; //keep open (non-zero), 0 to close the socket
@@ -660,67 +589,6 @@ static void psa_websocketTopicReceiver_close(const struct 
mg_connection *connect
     }
 }
 
-pubsub_admin_receiver_metrics_t* 
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t 
*receiver) {
-    pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->scope);
-    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
receiver->topic);
-
-    size_t msgTypesCount = 0;
-    celixThreadMutex_lock(&receiver->subscribers.mutex);
-    hash_map_iterator_t iter = 
hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-        while (hashMapIterator_hasNext(&iter2)) {
-            hashMapIterator_nextValue(&iter2);
-            msgTypesCount += 1;
-        }
-    }
-
-    result->nrOfMsgTypes = (unsigned long)msgTypesCount;
-    result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
-    int i = 0;
-    iter = hashMapIterator_construct(receiver->subscribers.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_subscriber_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-        while (hashMapIterator_hasNext(&iter2)) {
-            hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-            result->msgTypes[i].origins = 
calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
-            result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
-            int k = 0;
-            hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
-            while (hashMapIterator_hasNext(&iter3)) {
-                psa_websocket_subscriber_metrics_entry_t *metrics = 
hashMapIterator_nextValue(&iter3);
-                result->msgTypes[i].typeId = metrics->msgTypeId;
-                pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, 
(void*)(uintptr_t)metrics->msgTypeId);
-                if (msgSer) {
-                    snprintf(result->msgTypes[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
-                    uuid_copy(result->msgTypes[i].origins[k].originUUID, 
metrics->origin);
-                    result->msgTypes[i].origins[k].nrOfMessagesReceived = 
metrics->nrOfMessagesReceived;
-                    result->msgTypes[i].origins[k].nrOfSerializationErrors = 
metrics->nrOfSerializationErrors;
-                    result->msgTypes[i].origins[k].averageDelayInSeconds = 
metrics->averageDelayInSeconds;
-                    result->msgTypes[i].origins[k].maxDelayInSeconds = 
metrics->maxDelayInSeconds;
-                    result->msgTypes[i].origins[k].minDelayInSeconds = 
metrics->minDelayInSeconds;
-                    
result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = 
metrics->averageTimeBetweenMessagesInSeconds;
-                    
result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = 
metrics->averageSerializationTimeInSeconds;
-                    result->msgTypes[i].origins[k].lastMessageReceived = 
metrics->lastMessageReceived;
-                    result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = 
metrics->nrOfMissingSeqNumbers;
-
-                    k += 1;
-                } else {
-                    L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg 
map during metrics collection!\n", metrics->msgTypeId);
-                }
-            }
-            i +=1 ;
-        }
-    }
-
-    celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
-    return result;
-}
-
 
 static void 
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
 *receiver) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index 498b3e3..d3b484c 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -29,6 +29,7 @@
 #include "pubsub_psa_websocket_constants.h"
 #include "pubsub_websocket_common.h"
 #include <uuid/uuid.h>
+#include <jansson.h>
 #include "celix_constants.h"
 #include "http_admin/api.h"
 #include "civetweb.h"
@@ -49,8 +50,6 @@ struct pubsub_websocket_topic_sender {
     log_helper_t *logHelper;
     long serializerSvcId;
     pubsub_serializer_service_t *serializer;
-    uuid_t fwUUID;
-    bool metricsEnabled;
 
     char *scope;
     char *topic;
@@ -75,17 +74,7 @@ struct pubsub_websocket_topic_sender {
 typedef struct psa_websocket_send_msg_entry {
     pubsub_websocket_msg_header_t header; //partially filled header (only 
seqnr and time needs to be updated per send)
     pubsub_msg_serializer_t *msgSer;
-    celix_thread_mutex_t sendLock; //protects send & Seqnr
-    unsigned int seqNr;
-    struct {
-        celix_thread_mutex_t mutex; //protects entries in struct
-        unsigned long nrOfMessagesSend;
-        unsigned long nrOfMessagesSendFailed;
-        unsigned long nrOfSerializationErrors;
-        struct timespec lastMessageSend;
-        double averageTimeBetweenMessagesInSeconds;
-        double averageSerializationTimeInSeconds;
-    } metrics;
+    celix_thread_mutex_t sendLock; //protects send & header(.seqNr)
 } psa_websocket_send_msg_entry_t;
 
 typedef struct psa_websocket_bounded_service_entry {
@@ -121,12 +110,6 @@ pubsub_websocket_topic_sender_t* 
pubsub_websocketTopicSender_create(
     sender->serializerSvcId = serializerSvcId;
     sender->serializer = ser;
     psa_websocket_setScopeAndTopicFilter(scope, topic, 
sender->scopeAndTopicFilter);
-    const char* uuid = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
-    if (uuid != NULL) {
-        uuid_parse(uuid, sender->fwUUID);
-    }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, 
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
-
     sender->uri = psa_websocket_createURI(scope, topic);
 
     if (sender->uri != NULL) {
@@ -191,7 +174,6 @@ void 
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
                 hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
                 while (hashMapIterator_hasNext(&iter2)) {
                     psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
-                    celixThreadMutex_destroy(&msgEntry->metrics.mutex);
                     free(msgEntry);
 
                 }
@@ -260,15 +242,13 @@ static void* psa_websocket_getPublisherService(void 
*handle, const celix_bundle_
                 void *key = hashMapEntry_getKey(hashMapEntry);
                 psa_websocket_send_msg_entry_t *sendEntry = calloc(1, 
sizeof(*sendEntry));
                 sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
-                sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+                sendEntry->header.id = sendEntry->msgSer->msgName;
                 int major;
                 int minor;
                 version_getMajor(sendEntry->msgSer->msgVersion, &major);
                 version_getMinor(sendEntry->msgSer->msgVersion, &minor);
                 sendEntry->header.major = (uint8_t)major;
                 sendEntry->header.minor = (uint8_t)minor;
-                uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
-                celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
                 hashMap_put(entry->msgEntries, key, sendEntry);
                 hashMap_put(entry->msgTypeIds, 
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t) 
sendEntry->msgSer->msgId);
             }
@@ -305,7 +285,6 @@ static void psa_websocket_ungetPublisherService(void 
*handle, const celix_bundle
         hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter);
-            celixThreadMutex_destroy(&msgEntry->metrics.mutex);
             free(msgEntry);
         }
         hashMap_destroy(entry->msgEntries, false, false);
@@ -316,149 +295,60 @@ static void psa_websocket_ungetPublisherService(void 
*handle, const celix_bundle
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
-pubsub_admin_sender_metrics_t* 
pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) {
-    pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->scope);
-    snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", 
sender->topic);
-    celixThreadMutex_lock(&sender->boundedServices.mutex);
-    size_t count = 0;
-    hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter2)) {
-            hashMapIterator_nextValue(&iter2);
-            count += 1;
-        }
-    }
-
-    result->msgMetrics = calloc(count, sizeof(*result));
-
-    iter = hashMapIterator_construct(sender->boundedServices.map);
-    int i = 0;
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter2)) {
-            psa_websocket_send_msg_entry_t *mEntry = 
hashMapIterator_nextValue(&iter2);
-            celixThreadMutex_lock(&mEntry->metrics.mutex);
-            result->msgMetrics[i].nrOfMessagesSend = 
mEntry->metrics.nrOfMessagesSend;
-            result->msgMetrics[i].nrOfMessagesSendFailed = 
mEntry->metrics.nrOfMessagesSendFailed;
-            result->msgMetrics[i].nrOfSerializationErrors = 
mEntry->metrics.nrOfSerializationErrors;
-            result->msgMetrics[i].averageSerializationTimeInSeconds = 
mEntry->metrics.averageSerializationTimeInSeconds;
-            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = 
mEntry->metrics.averageTimeBetweenMessagesInSeconds;
-            result->msgMetrics[i].lastMessageSend = 
mEntry->metrics.lastMessageSend;
-            result->msgMetrics[i].bndId = entry->bndId;
-            result->msgMetrics[i].typeId = mEntry->header.type;
-            snprintf(result->msgMetrics[i].typeFqn, 
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
-            i += 1;
-            celixThreadMutex_unlock(&mEntry->metrics.mutex);
-        }
-    }
-
-    celixThreadMutex_unlock(&sender->boundedServices.mutex);
-    result->nrOfmsgMetrics = (int)count;
-    return result;
-}
-
 static int psa_websocket_topicPublicationSend(void* handle, unsigned int 
msgTypeId, const void *inMsg) {
     int status = CELIX_SERVICE_EXCEPTION;
     psa_websocket_bounded_service_entry_t *bound = handle;
     pubsub_websocket_topic_sender_t *sender = bound->parent;
-    bool monitor = sender->metricsEnabled;
     psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, 
(void *) (uintptr_t) (msgTypeId));
 
-    //metrics updates
-    struct timespec sendTime;
-    struct timespec serializationStart;
-    struct timespec serializationEnd;
-    //int unknownMessageCountUpdate = 0;
-    int sendErrorUpdate = 0;
-    int serializationErrorUpdate = 0;
-    int sendCountUpdate = 0;
-
     if (sender->sockConnection != NULL && entry != NULL) {
         delay_first_send_for_late_joiners(sender);
 
-        if (monitor) {
-            clock_gettime(CLOCK_REALTIME, &serializationStart);
-        }
-
         void *serializedOutput = NULL;
         size_t serializedOutputLen = 0;
         status = entry->msgSer->serialize(entry->msgSer->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
 
-        if (monitor) {
-            clock_gettime(CLOCK_REALTIME, &serializationEnd);
-        }
-
         if (status == CELIX_SUCCESS /*ser ok*/) {
+            json_error_t jsError;
             unsigned char *hdrEncoded = 
calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
 
             celixThreadMutex_lock(&entry->sendLock);
 
-            pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) + 
sizeof(char[serializedOutputLen]));
-            pubsub_websocket_msg_header_t *msgHdr = &entry->header;
-            if (monitor) {
-                clock_gettime(CLOCK_REALTIME, &sendTime);
-                msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec;
-                msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
-                msgHdr->seqNr++;
-            }
-            memcpy(&msg->header, msgHdr, 
sizeof(pubsub_websocket_msg_header_t));
-
-            msg->payloadSize = (unsigned int) serializedOutputLen;
-            size_t hdr_size = sizeof(msg->header);
-            size_t ps_size = sizeof(msg->payloadSize);
-            size_t bytes_to_write = hdr_size + ps_size + 
serializedOutputLen;//sizeof(*msg);
-            memcpy(msg->payload, serializedOutput, serializedOutputLen);
-            int bytes_written = 
mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, 
(char *) msg, bytes_to_write);
-
-            celixThreadMutex_unlock(&entry->sendLock);
-            if (bytes_written == (int) bytes_to_write) {
-                sendCountUpdate = 1;
+            json_t *jsMsg = json_object();
+            json_object_set_new(jsMsg, "id", json_string(entry->header.id));
+            json_object_set_new(jsMsg, "major", 
json_integer(entry->header.major));
+            json_object_set_new(jsMsg, "minor", 
json_integer(entry->header.minor));
+            json_object_set_new(jsMsg, "seqNr", 
json_integer(entry->header.seqNr++));
+
+            json_t *jsData;
+            jsData = json_loadb((const char *)serializedOutput, 
serializedOutputLen - 1, 0, &jsError);
+            if(jsData != NULL) {
+                json_object_set_new(jsMsg, "data", jsData);
+                const char *msg = json_dumps(jsMsg, 0);
+                size_t bytes_to_write = strlen(msg);
+                int bytes_written = 
mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, msg,
+                                                              bytes_to_write);
+                free((void *) msg);
+                json_decref(jsData); //Decrease ref count means freeing the 
object
+                if (bytes_written != (int) bytes_to_write) {
+                    L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket, 
written %lu of total %lu bytes", bytes_written, bytes_to_write);
+                }
             } else {
-                sendErrorUpdate = 1;
-                L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket.");
+                L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket, serialized 
data corrupt. Error(%d;%d;%d): %s", jsError.column, jsError.line, 
jsError.position, jsError.text);
             }
+            celixThreadMutex_unlock(&entry->sendLock);
 
-            free(msg);
+            json_decref(jsMsg); //Decrease ref count means freeing the object
             free(hdrEncoded);
             free(serializedOutput);
         } else {
-            serializationErrorUpdate = 1;
             L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for 
scope/topic %s/%s",
                    entry->msgSer->msgName, sender->scope, sender->topic);
         }
     } else if (entry == NULL){
-        //unknownMessageCountUpdate = 1;
         L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i 
for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
     }
 
-
-    if (monitor && status == CELIX_SUCCESS) {
-        celixThreadMutex_lock(&entry->metrics.mutex);
-
-        long n = entry->metrics.nrOfMessagesSend + 
entry->metrics.nrOfMessagesSendFailed;
-        double diff = celix_difftime(&serializationStart, &serializationEnd);
-        double average = (entry->metrics.averageSerializationTimeInSeconds * n 
+ diff) / (n+1);
-        entry->metrics.averageSerializationTimeInSeconds = average;
-
-        if (entry->metrics.nrOfMessagesSend > 2) {
-            diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
-            n = entry->metrics.nrOfMessagesSend;
-            average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n 
+ diff) / (n+1);
-            entry->metrics.averageTimeBetweenMessagesInSeconds = average;
-        }
-
-        entry->metrics.lastMessageSend = sendTime;
-        entry->metrics.nrOfMessagesSend += sendCountUpdate;
-        entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
-        entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
-
-        celixThreadMutex_unlock(&entry->metrics.mutex);
-    }
-
     return status;
 }
 

Reply via email to