This is an automated email from the ASF dual-hosted git repository.
pnoltes pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to refs/heads/develop by this push:
new e48987d Feature/psa websocket improvements (#125)
e48987d is described below
commit e48987dee443d1571fdbab7bd96c35cab056c79f
Author: dhbfischer <[email protected]>
AuthorDate: Tue Oct 22 17:01:07 2019 +0200
Feature/psa websocket improvements (#125)
* Improved pubsub websocket with json header and added publisher example
with webpage
* Added missing publisher websocket directory
---
bundles/pubsub/examples/CMakeLists.txt | 20 ++
bundles/pubsub/examples/pubsub/CMakeLists.txt | 1 +
.../examples/pubsub/publisher2/CMakeLists.txt | 1 -
.../CMakeLists.txt | 27 +-
.../publisher_websocket/resources/index.html | 40 +++
.../pubsub/publisher_websocket/resources/script.js | 38 +++
.../src/pubsub_websocket_common.h | 13 +-
.../src/pubsub_websocket_topic_receiver.c | 306 ++++++---------------
.../src/pubsub_websocket_topic_sender.c | 162 ++---------
9 files changed, 231 insertions(+), 377 deletions(-)
diff --git a/bundles/pubsub/examples/CMakeLists.txt
b/bundles/pubsub/examples/CMakeLists.txt
index ec7e5a9..a36e87d 100644
--- a/bundles/pubsub/examples/CMakeLists.txt
+++ b/bundles/pubsub/examples/CMakeLists.txt
@@ -391,3 +391,23 @@ if (BUILD_PUBSUB_PSA_NANOMSG)
endif ()
endif()
+
+add_celix_container(pubsub_publisher_websocket
+ GROUP pubsub
+ BUNDLES
+ Celix::log_service
+ Celix::shell
+ Celix::shell_tui
+ Celix::http_admin
+ Celix::pubsub_serializer_json
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_websocket
+ celix_pubsub_websocket_publisher
+ PROPERTIES
+ PSA_TCP_VERBOSE=true
+ PUBSUB_ETCD_DISCOVERY_VERBOSE=true
+ PUBSUB_TOPOLOGY_MANAGER_VERBOSE=true
+ CELIX_HTTP_ADMIN_LISTENING_PORTS=7660
+ CELIX_HTTP_ADMIN_NUM_THREADS=5
+)
+target_link_libraries(pubsub_publisher_websocket PRIVATE
${PUBSUB_CONTAINER_LIBS})
diff --git a/bundles/pubsub/examples/pubsub/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/CMakeLists.txt
index 73b3e52..5bfb06f 100644
--- a/bundles/pubsub/examples/pubsub/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/CMakeLists.txt
@@ -19,6 +19,7 @@ include_directories("common/include")
add_subdirectory(publisher)
add_subdirectory(publisher2)
+add_subdirectory(publisher_websocket)
add_subdirectory(subscriber)
diff --git a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
index a29968b..3927022 100644
--- a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
@@ -48,4 +48,3 @@ celix_bundle_files(celix_pubsub_poi_publisher2
DESTINATION "META-INF/keys/subscriber"
)
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework)
diff --git a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
b/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
similarity index 66%
copy from bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
copy to bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
index a29968b..d4d5290 100644
--- a/bundles/pubsub/examples/pubsub/publisher2/CMakeLists.txt
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/CMakeLists.txt
@@ -15,37 +15,46 @@
# specific language governing permissions and limitations
# under the License.
-add_celix_bundle(celix_pubsub_poi_publisher2
- SYMBOLIC_NAME "apache_celix_pubsub_poi_publisher2"
+
+find_package(CURL REQUIRED)
+find_package(OpenSSL REQUIRED)
+
+
+add_celix_bundle(celix_pubsub_websocket_publisher
+ SYMBOLIC_NAME "apache_celix_pubsub_websocket_publisher"
VERSION "1.0.0"
SOURCES
../publisher/private/src/ps_pub_activator.c
../publisher/private/src/pubsub_publisher.c
)
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework
Celix::pubsub_api)
-target_include_directories(celix_pubsub_poi_publisher2 PRIVATE
../publisher/private/include)
+target_link_libraries(celix_pubsub_websocket_publisher PRIVATE
Celix::framework Celix::pubsub_api ${CURL_LIBRARIES} ${OPENSSL_LIBRARIES})
+target_include_directories(celix_pubsub_websocket_publisher PRIVATE
../publisher/private/include)
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi1.descriptor
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/msg_poi2.descriptor
DESTINATION "META-INF/descriptors"
)
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi1.properties
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/pubsub/msg_descriptors/poi2.properties
DESTINATION "META-INF/topics/pub"
)
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/publisher
DESTINATION "META-INF/keys"
)
-celix_bundle_files(celix_pubsub_poi_publisher2
+celix_bundle_files(celix_pubsub_websocket_publisher
${PROJECT_SOURCE_DIR}/bundles/pubsub/examples/keys/subscriber/public
DESTINATION "META-INF/keys/subscriber"
)
-target_link_libraries(celix_pubsub_poi_publisher2 PRIVATE Celix::framework)
+celix_bundle_add_dir(celix_pubsub_websocket_publisher resources DESTINATION
".")
+
+celix_bundle_headers(celix_pubsub_websocket_publisher
+ "X-Web-Resource: /example$<SEMICOLON>/resources"
+)
diff --git
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
new file mode 100644
index 0000000..e1ecb3d
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/index.html
@@ -0,0 +1,40 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="utf-8"/>
+ <title>Apache Celix Websocket publisher example</title>
+ <script src="script.js"></script>
+</head>
+<body>
+ <div>
+ <h1>Poi1 message:</h1>
+ <br/>
+ <p id="receivePoi1"></p>
+ </div>
+ <br/>
+ <div>
+ <h1>Poi2 message:</h1>
+ <br/>
+ <p id="receivePoi2"></p>
+ </div>
+ <script>docReady();</script>
+</body>
+</html>
diff --git
a/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
new file mode 100644
index 0000000..81026a4
--- /dev/null
+++ b/bundles/pubsub/examples/pubsub/publisher_websocket/resources/script.js
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+function docReady() {
+ var host = window.location.host;
+ var shellSocketP1 = new WebSocket("ws://" + host + "/pubsub/default/poi1");
+ var shellSocketP2 = new WebSocket("ws://" + host + "/pubsub/default/poi2");
+
+ shellSocketP1.onmessage = function (event) {
+ console.log(event);
+ var obj = JSON.parse(event.data);
+ document.getElementById("receivePoi1").innerHTML = "Received " +
obj.id + " message with sequence nr: "
+ + obj.seqNr + "<br/>latitude = " +
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " +
JSON.stringify(obj.data.location.lon);
+ };
+
+ shellSocketP2.onmessage = function (event) {
+ console.log(event);
+ var obj = JSON.parse(event.data);
+ document.getElementById("receivePoi2").innerHTML = "Received " +
obj.id + " message with sequence nr: "
+ + obj.seqNr + "<br/>latitude = " +
JSON.stringify(obj.data.location.lat) + "<br/>longitude = " +
JSON.stringify(obj.data.location.lon);
+ };
+}
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
index 9f918f6..4c22319 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_common.h
@@ -27,25 +27,14 @@
struct pubsub_websocket_msg_header {
- uint32_t type; //msg type id (hash of fqn)
+ const char *id; //FQN
uint8_t major;
uint8_t minor;
uint32_t seqNr;
- unsigned char originUUID[16];
- uint64_t sendtimeSeconds; //seconds since epoch
- uint64_t sendTimeNanoseconds; //ns since epoch
};
typedef struct pubsub_websocket_msg_header pubsub_websocket_msg_header_t;
-struct pubsub_websocket_msg {
- pubsub_websocket_msg_header_t header;
- unsigned int payloadSize;
- char payload[];
-};
-
-typedef struct pubsub_websocket_msg pubsub_websocket_msg_t;
-
void psa_websocket_setScopeAndTopicFilter(const char* scope, const char
*topic, char *filter);
char *psa_websocket_createURI(const char *scope, const char *topic);
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
index 1c9e4a0..1c90f3c 100644
---
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
+++
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_receiver.c
@@ -33,8 +33,8 @@
#include "pubsub_websocket_common.h"
#include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
#include <http_admin/api.h>
+#include <jansson.h>
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -52,10 +52,14 @@
typedef struct pubsub_websocket_rcv_buffer {
celix_thread_mutex_t mutex;
- celix_array_list_t *list; //List of received websocket messages (type:
pubsub_websocket_msg_t *)
- celix_array_list_t *rcvTimes; //Corresponding receive times of the
received websocket messages (rcvTimes[i] -> list[i])
+ celix_array_list_t *list; //List of received websocket messages (type:
pubsub_websocket_msg_entry_t *)
} pubsub_websocket_rcv_buffer_t;
+typedef struct pubsub_websocket_msg_entry {
+ size_t msgSize;
+ const char *msgData;
+} pubsub_websocket_msg_entry_t;
+
struct pubsub_websocket_topic_receiver {
celix_bundle_context_t *ctx;
log_helper_t *logHelper;
@@ -65,7 +69,6 @@ struct pubsub_websocket_topic_receiver {
char *topic;
char scopeAndTopicFilter[5];
char *uri;
- bool metricsEnabled;
pubsub_websocket_rcv_buffer_t recvBuffer;
@@ -101,26 +104,9 @@ typedef struct psa_websocket_requested_connection_entry {
bool statically; //true if the connection is statically configured through
the topic properties.
} psa_websocket_requested_connection_entry_t;
-typedef struct psa_websocket_subscriber_metrics_entry_t {
- unsigned int msgTypeId;
- uuid_t origin;
-
- unsigned long nrOfMessagesReceived;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageReceived;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- double averageDelayInSeconds;
- double maxDelayInSeconds;
- double minDelayInSeconds;
- unsigned int lastSeqNr;
- unsigned long nrOfMissingSeqNumbers;
-} psa_websocket_subscriber_metrics_entry_t;
-
typedef struct psa_websocket_subscriber_entry {
int usageCount;
- hash_map_t *msgTypes; //map from serializer svc
- hash_map_t *metrics; //key = msg type id, value = hash_map (key = origin
uuid, value = psa_websocket_subscriber_metrics_entry_t*
+ hash_map_t *msgTypes; //key = msg type id, value = pubsub_msg_serializer_t
pubsub_subscriber_t *svc;
bool initialized; //true if the init function is called through the
receive thread
} psa_websocket_subscriber_entry_t;
@@ -131,6 +117,7 @@ static void
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
static void* psa_websocket_recvThread(void * data);
static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
*receiver);
static void
psa_websocket_initializeAllSubscribers(pubsub_websocket_topic_receiver_t
*receiver);
+static void *psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t
*msg_type_id_map);
static int psa_websocketTopicReceiver_data(struct mg_connection *connection,
int op_code, char *data, size_t length, void *handle);
static void psa_websocketTopicReceiver_close(const struct mg_connection
*connection, void *handle);
@@ -151,7 +138,6 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
receiver->scope = strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
psa_websocket_setScopeAndTopicFilter(scope, topic,
receiver->scopeAndTopicFilter);
- receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx,
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
receiver->uri = psa_websocket_createURI(scope, topic);
@@ -164,7 +150,6 @@ pubsub_websocket_topic_receiver_t*
pubsub_websocketTopicReceiver_create(celix_bu
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash,
NULL, utils_stringEquals, NULL);
arrayList_create(&receiver->recvBuffer.list);
- arrayList_create(&receiver->recvBuffer.rcvTimes);
}
//track subscribers
@@ -259,13 +244,6 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
if (entry != NULL) {
- hash_map_iterator_t iter2 =
hashMapIterator_construct(entry->metrics);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_t *origins = hashMapIterator_nextValue(&iter2);
- hashMap_destroy(origins, true, true);
- }
- hashMap_destroy(entry->metrics, false, false);
-
receiver->serializer->destroySerializerMap(receiver->serializer->handle,
entry->msgTypes);
free(entry);
}
@@ -300,20 +278,13 @@ void
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
celixThreadMutex_destroy(&receiver->recvBuffer.mutex);
int msgBufSize = celix_arrayList_size(receiver->recvBuffer.list);
while(msgBufSize > 0) {
- pubsub_websocket_msg_t *msg =
celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+ pubsub_websocket_msg_entry_t *msg =
celix_arrayList_get(receiver->recvBuffer.list, msgBufSize - 1);
+ free((void *) msg->msgData);
free(msg);
msgBufSize--;
}
celix_arrayList_destroy(receiver->recvBuffer.list);
- int rcvTimesSize = celix_arrayList_size(receiver->recvBuffer.rcvTimes);
- while(rcvTimesSize > 0) {
- struct timespec *time =
celix_arrayList_get(receiver->recvBuffer.rcvTimes, rcvTimesSize - 1);
- free(time);
- rcvTimesSize--;
- }
- celix_arrayList_destroy(receiver->recvBuffer.rcvTimes);
-
free(receiver->uri);
free(receiver->scope);
free(receiver->topic);
@@ -416,16 +387,6 @@ static void
pubsub_websocketTopicReceiver_addSubscriber(void *handle, void *svc,
int rc =
receiver->serializer->createSerializerMap(receiver->serializer->handle,
(celix_bundle_t*)bnd, &entry->msgTypes);
if (rc == 0) {
- entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
- hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgTypes);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_msg_serializer_t *msgSer =
hashMapIterator_nextValue(&iter);
- hash_map_t *origins = hashMap_create(utils_stringHash, NULL,
utils_stringEquals, NULL);
- hashMap_put(entry->metrics, (void*)(uintptr_t)msgSer->msgId,
origins);
- }
- }
-
- if (rc == 0) {
hashMap_put(receiver->subscribers.map, (void*)bndId, entry);
} else {
L_ERROR("[PSA_WEBSOCKET] Cannot create msg serializer map for
TopicReceiver %s/%s", receiver->scope, receiver->topic);
@@ -452,118 +413,98 @@ static void
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
if (rc != 0) {
L_ERROR("[PSA_WEBSOCKET] Cannot destroy msg serializers map for
TopicReceiver %s/%s", receiver->scope, receiver->topic);
}
- hash_map_iterator_t iter = hashMapIterator_construct(entry->metrics);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_t *origins = hashMapIterator_nextValue(&iter);
- hashMap_destroy(origins, true, true);
- }
- hashMap_destroy(entry->metrics, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static inline void
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver,
psa_websocket_subscriber_entry_t* entry, const pubsub_websocket_msg_header_t
*hdr, const void* payload, size_t payloadSize, struct timespec *receiveTime) {
+static void * psa_websocket_getMsgTypeIdFromFqn(const char *fqn, hash_map_t
*msg_type_id_map) {
+ void *msgTypeId = NULL;
+ if(fqn != NULL && msg_type_id_map != NULL) {
+ hash_map_iterator_t iter = hashMapIterator_construct(msg_type_id_map);
+ while (hashMapIterator_hasNext(&iter)) {
+ hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter);
+ pubsub_msg_serializer_t *serializer = hashMapEntry_getValue(entry);
+ if(strcmp(serializer->msgName, fqn) == 0) {
+ msgTypeId = hashMapEntry_getKey(entry);
+ return msgTypeId;
+ }
+ }
+ }
+
+ return msgTypeId;
+}
+
+static inline void
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver,
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr,
const char* payload, size_t payloadSize) {
//NOTE receiver->subscribers.mutex locked
- pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes,
(void*)(uintptr_t)(hdr->type));
+ void *msgTypeId = psa_websocket_getMsgTypeIdFromFqn(hdr->id,
entry->msgTypes);
+ pubsub_msg_serializer_t* msgSer = hashMap_get(entry->msgTypes, msgTypeId);
pubsub_subscriber_t *svc = entry->svc;
- bool monitor = receiver->metricsEnabled;
-
- //monitoring
- struct timespec beginSer;
- struct timespec endSer;
- int updateReceiveCount = 0;
- int updateSerError = 0;
- if (msgSer!= NULL) {
+ if (msgSer!= NULL && msgTypeId != 0) {
void *deserializedMsg = NULL;
bool validVersion = psa_websocket_checkVersion(msgSer->msgVersion,
hdr);
if (validVersion) {
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &beginSer);
- }
celix_status_t status = msgSer->deserialize(msgSer->handle,
payload, payloadSize, &deserializedMsg);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &endSer);
- }
+
if (status == CELIX_SUCCESS) {
bool release = true;
svc->receive(svc->handle, msgSer->msgName, msgSer->msgId,
deserializedMsg, &release);
if (release) {
msgSer->freeMsg(msgSer->handle, deserializedMsg);
}
- updateReceiveCount += 1;
} else {
- updateSerError += 1;
L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for
scope/topic %s/%s", msgSer->msgName, receiver->scope, receiver->topic);
}
}
} else {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X",
hdr->type);
- }
-
- if (msgSer != NULL && monitor) {
- hash_map_t *origins = hashMap_get(entry->metrics, (void*)(uintptr_t
)hdr->type);
- char uuidStr[UUID_STR_LEN+1];
- uuid_unparse(hdr->originUUID, uuidStr);
- psa_websocket_subscriber_metrics_entry_t *metrics =
hashMap_get(origins, uuidStr);
-
- if (metrics == NULL) {
- metrics = calloc(1, sizeof(*metrics));
- hashMap_put(origins, strndup(uuidStr, UUID_STR_LEN+1), metrics);
- uuid_copy(metrics->origin, hdr->originUUID);
- metrics->msgTypeId = hdr->type;
- metrics->maxDelayInSeconds = -INFINITY;
- metrics->minDelayInSeconds = INFINITY;
- metrics->lastSeqNr = 0;
- }
-
- double diff = celix_difftime(&beginSer, &endSer);
- long n = metrics->nrOfMessagesReceived;
- metrics->averageSerializationTimeInSeconds =
(metrics->averageSerializationTimeInSeconds * n + diff) / (n+1);
-
- diff = celix_difftime(&metrics->lastMessageReceived, receiveTime);
- n = metrics->nrOfMessagesReceived;
- if (metrics->nrOfMessagesReceived >= 1) {
- metrics->averageTimeBetweenMessagesInSeconds =
(metrics->averageTimeBetweenMessagesInSeconds * n + diff) / (n + 1);
- }
- metrics->lastMessageReceived = *receiveTime;
-
-
- int incr = hdr->seqNr - metrics->lastSeqNr;
- if (metrics->lastSeqNr >0 && incr > 1) {
- metrics->nrOfMissingSeqNumbers += (incr - 1);
- L_WARN("Missing message seq nr went from %i to %i",
metrics->lastSeqNr, hdr->seqNr);
- }
- metrics->lastSeqNr = hdr->seqNr;
-
- struct timespec sendTime;
- sendTime.tv_sec = (time_t)hdr->sendtimeSeconds;
- sendTime.tv_nsec = (long)hdr->sendTimeNanoseconds; //TODO FIXME the
tv_nsec is not correct
- diff = celix_difftime(&sendTime, receiveTime);
- metrics->averageDelayInSeconds = (metrics->averageDelayInSeconds * n +
diff) / (n+1);
- if (diff < metrics->minDelayInSeconds) {
- metrics->minDelayInSeconds = diff;
- }
- if (diff > metrics->maxDelayInSeconds) {
- metrics->maxDelayInSeconds = diff;
- }
-
- metrics->nrOfMessagesReceived += updateReceiveCount;
- metrics->nrOfSerializationErrors += updateSerError;
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot find serializer for type id 0x%X,
fqn %s", msgTypeId, hdr->id);
}
}
-static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver,
const pubsub_websocket_msg_header_t *hdr, const char *payload, size_t
payloadSize, struct timespec *receiveTime) {
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- processMsgForSubscriberEntry(receiver, entry, hdr, payload,
payloadSize, receiveTime);
+static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver,
const char *msg, size_t msgSize) {
+ json_error_t error;
+ json_t *jsMsg = json_loadb(msg, msgSize, 0, &error);
+ if(jsMsg != NULL) {
+ json_t *jsId = json_object_get(jsMsg, "id");
+ json_t *jsMajor = json_object_get(jsMsg, "major");
+ json_t *jsMinor = json_object_get(jsMsg, "minor");
+ json_t *jsSeqNr = json_object_get(jsMsg, "seqNr");
+ json_t *jsData = json_object_get(jsMsg, "data");
+
+ if (jsId && jsMajor && jsMinor && jsSeqNr && jsData) {
+ pubsub_websocket_msg_header_t hdr;
+ hdr.id = json_string_value(jsId);
+ hdr.major = (uint8_t) json_integer_value(jsMajor);
+ hdr.minor = (uint8_t) json_integer_value(jsMinor);
+ hdr.seqNr = (uint32_t) json_integer_value(jsSeqNr);
+ const char *payload = json_dumps(jsData, 0);
+ size_t payloadSize = strlen(payload);
+ printf("Received msg: id %s\tmajor %u\tminor %u\tseqNr %u\tdata
%s\n", hdr.id, hdr.major, hdr.minor, hdr.seqNr, payload);
+
+ celixThreadMutex_lock(&receiver->subscribers.mutex);
+ hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
+ if (entry != NULL) {
+ processMsgForSubscriberEntry(receiver, entry, &hdr,
payload, payloadSize);
+ }
+ }
+ celixThreadMutex_unlock(&receiver->subscribers.mutex);
+ free((void *) hdr.id);
+ free((void *) payload);
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Received unsupported message: "
+ "ID = %s, major = %d, minor = %d, seqNr = %d, data valid?
%s",
+ (jsId ? json_string_value(jsId) : "ERROR"),
+ json_integer_value(jsMajor), json_integer_value(jsMinor),
+ json_integer_value(jsSeqNr), (jsData ? "TRUE" : "FALSE"));
}
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Failed to load websocket JSON message,
error: %s", error.line, error.text);
+ return;
}
- celixThreadMutex_unlock(&receiver->subscribers.mutex);
+
}
static void* psa_websocket_recvThread(void * data) {
@@ -592,18 +533,13 @@ static void* psa_websocket_recvThread(void * data) {
while(celix_arrayList_size(receiver->recvBuffer.list) > 0) {
celixThreadMutex_lock(&receiver->recvBuffer.mutex);
- pubsub_websocket_msg_t *msg = (pubsub_websocket_msg_t *)
celix_arrayList_get(receiver->recvBuffer.list, 0);
- struct timespec *rcvTime = (struct timespec *)
celix_arrayList_get(receiver->recvBuffer.rcvTimes, 0);
+ pubsub_websocket_msg_entry_t *msg = (pubsub_websocket_msg_entry_t
*) celix_arrayList_get(receiver->recvBuffer.list, 0);
+ celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
- processMsg(receiver, &msg->header, msg->payload, msg->payloadSize,
rcvTime);
+ processMsg(receiver, msg->msgData, msg->msgSize);
+ free((void *)msg->msgData);
free(msg);
- free(rcvTime);
-
- celixThreadMutex_lock(&receiver->recvBuffer.mutex);
- celix_arrayList_removeAt(receiver->recvBuffer.list, 0);
- celix_arrayList_removeAt(receiver->recvBuffer.rcvTimes, 0);
- celixThreadMutex_unlock(&receiver->recvBuffer.mutex);
}
celixThreadMutex_lock(&receiver->recvThread.mutex);
@@ -631,21 +567,14 @@ static int psa_websocketTopicReceiver_data(struct
mg_connection *connection __at
if (handle != NULL) {
psa_websocket_requested_connection_entry_t *entry =
(psa_websocket_requested_connection_entry_t *) handle;
- pubsub_websocket_msg_t *rcvdMsg = malloc(length);
- memcpy(rcvdMsg, data, length);
-
- //Check if payload is completely received
- unsigned long rcvdPayloadSize = length - sizeof(rcvdMsg->header) -
sizeof(rcvdMsg->payloadSize);
- if(rcvdMsg->payloadSize == rcvdPayloadSize) {
- celixThreadMutex_lock(&entry->recvBuffer->mutex);
- celix_arrayList_add(entry->recvBuffer->list, rcvdMsg);
- struct timespec *receiveTime = malloc(sizeof(*receiveTime));
- clock_gettime(CLOCK_REALTIME, receiveTime);
- celix_arrayList_add(entry->recvBuffer->rcvTimes, receiveTime);
- celixThreadMutex_unlock(&entry->recvBuffer->mutex);
- } else {
- free(rcvdMsg);
- }
+ celixThreadMutex_lock(&entry->recvBuffer->mutex);
+ pubsub_websocket_msg_entry_t *msg = malloc(sizeof(*msg));
+ const char *rcvdMsgData = malloc(length);
+ memcpy((void *) rcvdMsgData, data, length);
+ msg->msgData = rcvdMsgData;
+ msg->msgSize = length;
+ celix_arrayList_add(entry->recvBuffer->list, msg);
+ celixThreadMutex_unlock(&entry->recvBuffer->mutex);
}
return 1; //keep open (non-zero), 0 to close the socket
@@ -660,67 +589,6 @@ static void psa_websocketTopicReceiver_close(const struct
mg_connection *connect
}
}
-pubsub_admin_receiver_metrics_t*
pubsub_websocketTopicReceiver_metrics(pubsub_websocket_topic_receiver_t
*receiver) {
- pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s",
receiver->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s",
receiver->topic);
-
- size_t msgTypesCount = 0;
- celixThreadMutex_lock(&receiver->subscribers.mutex);
- hash_map_iterator_t iter =
hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
- while (hashMapIterator_hasNext(&iter2)) {
- hashMapIterator_nextValue(&iter2);
- msgTypesCount += 1;
- }
- }
-
- result->nrOfMsgTypes = (unsigned long)msgTypesCount;
- result->msgTypes = calloc(msgTypesCount, sizeof(*result->msgTypes));
- int i = 0;
- iter = hashMapIterator_construct(receiver->subscribers.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_subscriber_entry_t *entry =
hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_t *origins = hashMapIterator_nextValue(&iter2);
- result->msgTypes[i].origins =
calloc((size_t)hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
- result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
- int k = 0;
- hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
- while (hashMapIterator_hasNext(&iter3)) {
- psa_websocket_subscriber_metrics_entry_t *metrics =
hashMapIterator_nextValue(&iter3);
- result->msgTypes[i].typeId = metrics->msgTypeId;
- pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes,
(void*)(uintptr_t)metrics->msgTypeId);
- if (msgSer) {
- snprintf(result->msgTypes[i].typeFqn,
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", msgSer->msgName);
- uuid_copy(result->msgTypes[i].origins[k].originUUID,
metrics->origin);
- result->msgTypes[i].origins[k].nrOfMessagesReceived =
metrics->nrOfMessagesReceived;
- result->msgTypes[i].origins[k].nrOfSerializationErrors =
metrics->nrOfSerializationErrors;
- result->msgTypes[i].origins[k].averageDelayInSeconds =
metrics->averageDelayInSeconds;
- result->msgTypes[i].origins[k].maxDelayInSeconds =
metrics->maxDelayInSeconds;
- result->msgTypes[i].origins[k].minDelayInSeconds =
metrics->minDelayInSeconds;
-
result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
metrics->averageTimeBetweenMessagesInSeconds;
-
result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
metrics->averageSerializationTimeInSeconds;
- result->msgTypes[i].origins[k].lastMessageReceived =
metrics->lastMessageReceived;
- result->msgTypes[i].origins[k].nrOfMissingSeqNumbers =
metrics->nrOfMissingSeqNumbers;
-
- k += 1;
- } else {
- L_WARN("[PSA_WEBSOCKET]: Error cannot find key 0x%X in msg
map during metrics collection!\n", metrics->msgTypeId);
- }
- }
- i +=1 ;
- }
- }
-
- celixThreadMutex_unlock(&receiver->subscribers.mutex);
-
- return result;
-}
-
static void
psa_websocket_connectToAllRequestedConnections(pubsub_websocket_topic_receiver_t
*receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
diff --git
a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
index 498b3e3..d3b484c 100644
--- a/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/src/pubsub_websocket_topic_sender.c
@@ -29,6 +29,7 @@
#include "pubsub_psa_websocket_constants.h"
#include "pubsub_websocket_common.h"
#include <uuid/uuid.h>
+#include <jansson.h>
#include "celix_constants.h"
#include "http_admin/api.h"
#include "civetweb.h"
@@ -49,8 +50,6 @@ struct pubsub_websocket_topic_sender {
log_helper_t *logHelper;
long serializerSvcId;
pubsub_serializer_service_t *serializer;
- uuid_t fwUUID;
- bool metricsEnabled;
char *scope;
char *topic;
@@ -75,17 +74,7 @@ struct pubsub_websocket_topic_sender {
typedef struct psa_websocket_send_msg_entry {
pubsub_websocket_msg_header_t header; //partially filled header (only
seqnr and time needs to be updated per send)
pubsub_msg_serializer_t *msgSer;
- celix_thread_mutex_t sendLock; //protects send & Seqnr
- unsigned int seqNr;
- struct {
- celix_thread_mutex_t mutex; //protects entries in struct
- unsigned long nrOfMessagesSend;
- unsigned long nrOfMessagesSendFailed;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageSend;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- } metrics;
+ celix_thread_mutex_t sendLock; //protects send & header(.seqNr)
} psa_websocket_send_msg_entry_t;
typedef struct psa_websocket_bounded_service_entry {
@@ -121,12 +110,6 @@ pubsub_websocket_topic_sender_t*
pubsub_websocketTopicSender_create(
sender->serializerSvcId = serializerSvcId;
sender->serializer = ser;
psa_websocket_setScopeAndTopicFilter(scope, topic,
sender->scopeAndTopicFilter);
- const char* uuid = celix_bundleContext_getProperty(ctx,
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
- if (uuid != NULL) {
- uuid_parse(uuid, sender->fwUUID);
- }
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx,
PSA_WEBSOCKET_METRICS_ENABLED, PSA_WEBSOCKET_DEFAULT_METRICS_ENABLED);
-
sender->uri = psa_websocket_createURI(scope, topic);
if (sender->uri != NULL) {
@@ -191,7 +174,6 @@ void
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
hash_map_iterator_t iter2 =
hashMapIterator_construct(entry->msgEntries);
while (hashMapIterator_hasNext(&iter2)) {
psa_websocket_send_msg_entry_t *msgEntry =
hashMapIterator_nextValue(&iter2);
- celixThreadMutex_destroy(&msgEntry->metrics.mutex);
free(msgEntry);
}
@@ -260,15 +242,13 @@ static void* psa_websocket_getPublisherService(void
*handle, const celix_bundle_
void *key = hashMapEntry_getKey(hashMapEntry);
psa_websocket_send_msg_entry_t *sendEntry = calloc(1,
sizeof(*sendEntry));
sendEntry->msgSer = hashMapEntry_getValue(hashMapEntry);
- sendEntry->header.type = (int32_t)sendEntry->msgSer->msgId;
+ sendEntry->header.id = sendEntry->msgSer->msgName;
int major;
int minor;
version_getMajor(sendEntry->msgSer->msgVersion, &major);
version_getMinor(sendEntry->msgSer->msgVersion, &minor);
sendEntry->header.major = (uint8_t)major;
sendEntry->header.minor = (uint8_t)minor;
- uuid_copy(sendEntry->header.originUUID, sender->fwUUID);
- celixThreadMutex_create(&sendEntry->metrics.mutex, NULL);
hashMap_put(entry->msgEntries, key, sendEntry);
hashMap_put(entry->msgTypeIds,
strndup(sendEntry->msgSer->msgName, 1024), (void *)(uintptr_t)
sendEntry->msgSer->msgId);
}
@@ -305,7 +285,6 @@ static void psa_websocket_ungetPublisherService(void
*handle, const celix_bundle
hash_map_iterator_t iter =
hashMapIterator_construct(entry->msgEntries);
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_send_msg_entry_t *msgEntry =
hashMapIterator_nextValue(&iter);
- celixThreadMutex_destroy(&msgEntry->metrics.mutex);
free(msgEntry);
}
hashMap_destroy(entry->msgEntries, false, false);
@@ -316,149 +295,60 @@ static void psa_websocket_ungetPublisherService(void
*handle, const celix_bundle
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-pubsub_admin_sender_metrics_t*
pubsub_websocketTopicSender_metrics(pubsub_websocket_topic_sender_t *sender) {
- pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s",
sender->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s",
sender->topic);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- size_t count = 0;
- hash_map_iterator_t iter =
hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_bounded_service_entry_t *entry =
hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 =
hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- hashMapIterator_nextValue(&iter2);
- count += 1;
- }
- }
-
- result->msgMetrics = calloc(count, sizeof(*result));
-
- iter = hashMapIterator_construct(sender->boundedServices.map);
- int i = 0;
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_bounded_service_entry_t *entry =
hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 =
hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_websocket_send_msg_entry_t *mEntry =
hashMapIterator_nextValue(&iter2);
- celixThreadMutex_lock(&mEntry->metrics.mutex);
- result->msgMetrics[i].nrOfMessagesSend =
mEntry->metrics.nrOfMessagesSend;
- result->msgMetrics[i].nrOfMessagesSendFailed =
mEntry->metrics.nrOfMessagesSendFailed;
- result->msgMetrics[i].nrOfSerializationErrors =
mEntry->metrics.nrOfSerializationErrors;
- result->msgMetrics[i].averageSerializationTimeInSeconds =
mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
mEntry->metrics.averageTimeBetweenMessagesInSeconds;
- result->msgMetrics[i].lastMessageSend =
mEntry->metrics.lastMessageSend;
- result->msgMetrics[i].bndId = entry->bndId;
- result->msgMetrics[i].typeId = mEntry->header.type;
- snprintf(result->msgMetrics[i].typeFqn,
PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->msgSer->msgName);
- i += 1;
- celixThreadMutex_unlock(&mEntry->metrics.mutex);
- }
- }
-
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
- result->nrOfmsgMetrics = (int)count;
- return result;
-}
-
static int psa_websocket_topicPublicationSend(void* handle, unsigned int
msgTypeId, const void *inMsg) {
int status = CELIX_SERVICE_EXCEPTION;
psa_websocket_bounded_service_entry_t *bound = handle;
pubsub_websocket_topic_sender_t *sender = bound->parent;
- bool monitor = sender->metricsEnabled;
psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries,
(void *) (uintptr_t) (msgTypeId));
- //metrics updates
- struct timespec sendTime;
- struct timespec serializationStart;
- struct timespec serializationEnd;
- //int unknownMessageCountUpdate = 0;
- int sendErrorUpdate = 0;
- int serializationErrorUpdate = 0;
- int sendCountUpdate = 0;
-
if (sender->sockConnection != NULL && entry != NULL) {
delay_first_send_for_late_joiners(sender);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationStart);
- }
-
void *serializedOutput = NULL;
size_t serializedOutputLen = 0;
status = entry->msgSer->serialize(entry->msgSer->handle, inMsg,
&serializedOutput, &serializedOutputLen);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationEnd);
- }
-
if (status == CELIX_SUCCESS /*ser ok*/) {
+ json_error_t jsError;
unsigned char *hdrEncoded =
calloc(sizeof(pubsub_websocket_msg_header_t), sizeof(unsigned char));
celixThreadMutex_lock(&entry->sendLock);
- pubsub_websocket_msg_t *msg = malloc(sizeof(*msg) +
sizeof(char[serializedOutputLen]));
- pubsub_websocket_msg_header_t *msgHdr = &entry->header;
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &sendTime);
- msgHdr->sendtimeSeconds = (uint64_t) sendTime.tv_sec;
- msgHdr->sendTimeNanoseconds = (uint64_t) sendTime.tv_nsec;
- msgHdr->seqNr++;
- }
- memcpy(&msg->header, msgHdr,
sizeof(pubsub_websocket_msg_header_t));
-
- msg->payloadSize = (unsigned int) serializedOutputLen;
- size_t hdr_size = sizeof(msg->header);
- size_t ps_size = sizeof(msg->payloadSize);
- size_t bytes_to_write = hdr_size + ps_size +
serializedOutputLen;//sizeof(*msg);
- memcpy(msg->payload, serializedOutput, serializedOutputLen);
- int bytes_written =
mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT,
(char *) msg, bytes_to_write);
-
- celixThreadMutex_unlock(&entry->sendLock);
- if (bytes_written == (int) bytes_to_write) {
- sendCountUpdate = 1;
+ json_t *jsMsg = json_object();
+ json_object_set_new(jsMsg, "id", json_string(entry->header.id));
+ json_object_set_new(jsMsg, "major",
json_integer(entry->header.major));
+ json_object_set_new(jsMsg, "minor",
json_integer(entry->header.minor));
+ json_object_set_new(jsMsg, "seqNr",
json_integer(entry->header.seqNr++));
+
+ json_t *jsData;
+ jsData = json_loadb((const char *)serializedOutput,
serializedOutputLen - 1, 0, &jsError);
+ if(jsData != NULL) {
+ json_object_set_new(jsMsg, "data", jsData);
+ const char *msg = json_dumps(jsMsg, 0);
+ size_t bytes_to_write = strlen(msg);
+ int bytes_written =
mg_websocket_client_write(sender->sockConnection, MG_WEBSOCKET_OPCODE_TEXT, msg,
+ bytes_to_write);
+ free((void *) msg);
+ json_decref(jsData); //Decrease ref count means freeing the
object
+ if (bytes_written != (int) bytes_to_write) {
+ L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket,
written %lu of total %lu bytes", bytes_written, bytes_to_write);
+ }
} else {
- sendErrorUpdate = 1;
- L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket.");
+ L_WARN("[PSA_WEBSOCKET_TS] Error sending websocket, serialized
data corrupt. Error(%d;%d;%d): %s", jsError.column, jsError.line,
jsError.position, jsError.text);
}
+ celixThreadMutex_unlock(&entry->sendLock);
- free(msg);
+ json_decref(jsMsg); //Decrease ref count means freeing the object
free(hdrEncoded);
free(serializedOutput);
} else {
- serializationErrorUpdate = 1;
L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for
scope/topic %s/%s",
entry->msgSer->msgName, sender->scope, sender->topic);
}
} else if (entry == NULL){
- //unknownMessageCountUpdate = 1;
L_WARN("[PSA_WEBSOCKET_TS] Error sending message with msg type id %i
for scope/topic %s/%s", msgTypeId, sender->scope, sender->topic);
}
-
- if (monitor && status == CELIX_SUCCESS) {
- celixThreadMutex_lock(&entry->metrics.mutex);
-
- long n = entry->metrics.nrOfMessagesSend +
entry->metrics.nrOfMessagesSendFailed;
- double diff = celix_difftime(&serializationStart, &serializationEnd);
- double average = (entry->metrics.averageSerializationTimeInSeconds * n
+ diff) / (n+1);
- entry->metrics.averageSerializationTimeInSeconds = average;
-
- if (entry->metrics.nrOfMessagesSend > 2) {
- diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
- n = entry->metrics.nrOfMessagesSend;
- average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n
+ diff) / (n+1);
- entry->metrics.averageTimeBetweenMessagesInSeconds = average;
- }
-
- entry->metrics.lastMessageSend = sendTime;
- entry->metrics.nrOfMessagesSend += sendCountUpdate;
- entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
- entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
-
- celixThreadMutex_unlock(&entry->metrics.mutex);
- }
-
return status;
}