This is an automated email from the ASF dual-hosted git repository.

pnoltes pushed a commit to branch feature/use_ser_hander_in_psa
in repository https://gitbox.apache.org/repos/asf/celix.git

commit fea21167fcfc4c351f9d35a1bf72898bfbee89d5
Author: Pepijn Noltes <[email protected]>
AuthorDate: Wed May 26 20:03:57 2021 +0200

    Adds serializer handler to pubsub websocket.
---
 .../v2/src/pubsub_tcp_topic_receiver.c             |   4 +
 .../v2/src/pubsub_tcp_topic_sender.c               |  80 ++------
 .../pubsub_admin_websocket/v2/src/psa_activator.c  |  11 -
 .../v2/src/pubsub_websocket_admin.c                | 227 +++++----------------
 .../v2/src/pubsub_websocket_topic_receiver.c       |  52 +++--
 .../v2/src/pubsub_websocket_topic_receiver.h       |   5 +-
 .../v2/src/pubsub_websocket_topic_sender.c         | 102 +++------
 .../v2/src/pubsub_websocket_topic_sender.h         |   3 +-
 .../v2/src/pubsub_zmq_topic_receiver.c             |   4 +
 .../v2/src/pubsub_zmq_topic_sender.c               |   1 +
 .../pubsub_utils/src/pubsub_serializer_handler.c   |  17 +-
 11 files changed, 144 insertions(+), 362 deletions(-)

diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index b02768a..f602be9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -459,6 +459,10 @@ processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t 
*receiver, psa_tcp_subs
     //NOTE receiver->subscribers.mutex locked
 
     const char* msgFqn = 
pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, 
message->header.msgId);
+    if (msgFqn == NULL) {
+        L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+        return;
+    }
 
     void *deSerializedMsg = NULL;
     bool validVersion = 
pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, 
message->header.msgId, message->header.msgMajorVersion, 
message->header.msgMinorVersion);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c 
b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 11d73d9..47ef05a 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -67,6 +67,7 @@ struct pubsub_tcp_topic_sender {
     bool isPassive;
     bool verbose;
     unsigned long send_delay;
+    int seqNr; //atomic
 
     struct {
         long svcId;
@@ -79,23 +80,10 @@ struct pubsub_tcp_topic_sender {
     } boundedServices;
 };
 
-typedef struct psa_tcp_send_msg_entry {
-    uint32_t type; //msg type id (hash of fqn)
-    const char *fqn;
-    uint8_t major;
-    uint8_t minor;
-    unsigned char originUUID[16];
-    pubsub_protocol_service_t *protSer;
-    struct iovec *serializedIoVecOutput;
-    size_t serializedIoVecOutputLen;
-    unsigned int seqNr;
-} psa_tcp_send_msg_entry_t;
-
 typedef struct psa_tcp_bounded_service_entry {
     pubsub_tcp_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgEntries; //key = msg type id, value = 
psa_tcp_send_msg_entry_t
     int getCount;
 } psa_tcp_bounded_service_entry_t;
 
@@ -272,18 +260,7 @@ void 
pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender) {
         hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    psa_tcp_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
-                    if (msgEntry->serializedIoVecOutput)
-                        free(msgEntry->serializedIoVecOutput);
-                    msgEntry->serializedIoVecOutput = NULL;
-                    free(msgEntry);
-                }
-                hashMap_destroy(entry->msgEntries, false, false);
-                free(entry);
-            }
+            free(entry);
         }
         hashMap_destroy(sender->boundedServices.map, false, false);
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -349,7 +326,6 @@ static void *psa_tcp_getPublisherService(void *handle, 
const celix_bundle_t *req
         entry->getCount = 1;
         entry->parent = sender;
         entry->bndId = bndId;
-        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
         entry->service.handle = entry;
         entry->service.localMsgTypeIdForMsgType = 
psa_tcp_localMsgTypeIdForMsgType;
         entry->service.send = psa_tcp_topicPublicationSend;
@@ -373,16 +349,6 @@ static void psa_tcp_ungetPublisherService(void *handle, 
const celix_bundle_t *re
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void *) bndId);
-
-        hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_tcp_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter);
-            if (msgEntry->serializedIoVecOutput)
-                free(msgEntry->serializedIoVecOutput);
-            msgEntry->serializedIoVecOutput = NULL;
-            free(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -391,27 +357,17 @@ static void psa_tcp_ungetPublisherService(void *handle, 
const celix_bundle_t *re
 
 static int
 psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void 
*inMsg, celix_properties_t *metadata) {
-    int status = CELIX_SUCCESS;
     psa_tcp_bounded_service_entry_t *bound = handle;
     pubsub_tcp_topic_sender_t *sender = bound->parent;
-
-
-    psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) 
(uintptr_t) (msgTypeId));
-
-    if (entry == NULL) {
-        const char* fqn = 
pubsub_serializerHandler_getMsgFqn(sender->serializerHandler, msgTypeId);
-        if (fqn != NULL) {
-            entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
-            entry->protSer = sender->protocol;
-            entry->type = msgTypeId;
-            entry->fqn = fqn;
-            entry->major = 
pubsub_serializerHandler_getMsgMajorVersion(sender->serializerHandler, 
msgTypeId);
-            entry->minor = 
pubsub_serializerHandler_getMsgMinorVersion(sender->serializerHandler, 
msgTypeId);
-            uuid_copy(entry->originUUID, sender->fwUUID);
-            hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
-        } else {
-            L_WARN("Cannot find message serialization for msg id %i", 
(int)msgTypeId);
-        }
+    const char* msgFqn;
+    int majorVersion;
+    int minorversion;
+    celix_status_t status = 
pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, 
&msgFqn, &majorVersion, &minorversion);
+
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", 
msgTypeId,
+               
pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+        return status;
     }
 
     delay_first_send_for_late_joiners(sender);
@@ -419,11 +375,10 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
     size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
     struct iovec *serializedIoVecOutput = NULL;
     status = pubsub_serializerHandler_serialize(sender->serializerHandler, 
msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
-    entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, 
entry->serializedIoVecOutputLen);
 
     bool cont = false;
     if (status == CELIX_SUCCESS) /*ser ok*/ {
-        cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->fqn, 
msgTypeId, inMsg, &metadata);
+        cont = 
pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, 
msgTypeId, inMsg, &metadata);
     }
     if (cont) {
         pubsub_protocol_message_t message;
@@ -435,9 +390,9 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
             message.payload.length = serializedIoVecOutput->iov_len;
         }
         message.header.msgId = msgTypeId;
-        message.header.seqNr = entry->seqNr;
-        message.header.msgMajorVersion = entry->major;
-        message.header.msgMinorVersion = entry->minor;
+        message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, 
__ATOMIC_RELAXED);
+        message.header.msgMajorVersion = (uint16_t)majorVersion;
+        message.header.msgMinorVersion = (uint16_t)minorversion;
         message.header.payloadSize = 0;
         message.header.payloadPartSize = 0;
         message.header.payloadOffset = 0;
@@ -445,7 +400,6 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
         if (metadata != NULL) {
             message.metadata.metadata = metadata;
         }
-        entry->seqNr++;
         bool sendOk = true;
         {
             int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, 
serializedIoVecOutput, serializedIoVecOutputLen, 0);
@@ -453,7 +407,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
                 status = -1;
                 sendOk = false;
             }
-            
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, 
entry->fqn, msgTypeId, inMsg, metadata);
+            
pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, 
msgTypeId, inMsg, metadata);
             if (message.metadata.metadata) {
                 celix_properties_destroy(message.metadata.metadata);
             }
@@ -467,7 +421,7 @@ psa_tcp_topicPublicationSend(void *handle, unsigned int 
msgTypeId, const void *i
             L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
         }
     } else {
-        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for 
scope/topic %s/%s", entry->fqn,
+        L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for 
scope/topic %s/%s", msgFqn,
                sender->scope == NULL ? "(null)" : sender->scope, 
sender->topic);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 159d8ed..33cc86f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -53,17 +53,6 @@ int psa_websocket_start(psa_websocket_activator_t *act, 
celix_bundle_context_t *
     act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
     celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : 
CELIX_BUNDLE_EXCEPTION;
 
-    //track serializers (only json)
-    if (status == CELIX_SUCCESS) {
-        celix_service_tracking_options_t opts = 
CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
-        opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
-        opts.filter.ignoreServiceLanguage = true;
-        opts.callbackHandle = act->admin;
-        opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
-        opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
-        act->serializersTrackerId = 
celix_bundleContext_trackServicesWithOptions(ctx, &opts);
-    }
-
     //register pubsub admin service
     if (status == CELIX_SUCCESS) {
         pubsub_admin_service_t *psaSvc = &act->adminService;
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 8950fda..bf74d38 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -18,18 +18,18 @@
  */
 
 #include <memory.h>
-#include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
 #include <ip_utils.h>
-#include <pubsub_message_serialization_service.h>
-#include <pubsub_matching.h>
 
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_matching.h"
 #include "pubsub_utils.h"
 #include "pubsub_websocket_admin.h"
 #include "pubsub_psa_websocket_constants.h"
 #include "pubsub_websocket_topic_sender.h"
 #include "pubsub_websocket_topic_receiver.h"
 #include "pubsub_websocket_common.h"
+#include "pubsub_serializer_handler.h"
 
 #define L_DEBUG(...) \
     celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -52,11 +52,6 @@ struct pubsub_websocket_admin {
     bool verbose;
 
     struct {
-        celix_thread_rwlock_t mutex;
-        hash_map_t *map; //key = svcId, value = 
psa_websocket_serializer_entry_t*
-    } serializers;
-
-    struct {
         celix_thread_mutex_t mutex;
         hash_map_t *map; //key = scope:topic key, value = 
pubsub_websocket_topic_sender_t*
     } topicSenders;
@@ -71,6 +66,10 @@ struct pubsub_websocket_admin {
         hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* 
(endpoint)
     } discoveredEndpoints;
 
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map; //key = pubsub message serialization marker svc id 
(long), pubsub_serialization_handler_t*.
+    } serializationHandlers;
 };
 
 static celix_status_t 
pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, 
pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t 
*endpoint);
@@ -93,9 +92,6 @@ pubsub_websocket_admin_t* 
pubsub_websocketAdmin_create(celix_bundle_context_t *c
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
     psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, 
PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
 
-    celixThreadRwlock_create(&psa->serializers.mutex, NULL);
-    psa->serializers.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-
     celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
     psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
@@ -105,6 +101,9 @@ pubsub_websocket_admin_t* 
pubsub_websocketAdmin_create(celix_bundle_context_t *c
     celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
     psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
+    celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+    psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
     return psa;
 }
 
@@ -138,13 +137,13 @@ void 
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
     }
     celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
 
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    iter = hashMapIterator_construct(psa->serializers.map);
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    iter = hashMapIterator_construct(psa->serializationHandlers.map);
     while (hashMapIterator_hasNext(&iter)) {
-        psa_websocket_serializer_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-        free(entry);
+        pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+        pubsub_serializerHandler_destroy(entry);
     }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
 
     celixThreadMutex_destroy(&psa->topicSenders.mutex);
     hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,112 +154,12 @@ void 
pubsub_websocketAdmin_destroy(pubsub_websocket_admin_t *psa) {
     celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
     hashMap_destroy(psa->discoveredEndpoints.map, false, false);
 
-    celixThreadRwlock_destroy(&psa->serializers.mutex);
-    hashMap_destroy(psa->serializers.map, false, false);
+    celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+    hashMap_destroy(psa->serializationHandlers.map, false, false);
 
     free(psa);
 }
 
-void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
-    pubsub_websocket_admin_t *psa = handle;
-
-    const char *serType = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-    const char *msgFqn = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
-    const char *msgVersion = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
-    if (serType == NULL || msgId == -1L || msgFqn == NULL) {
-        L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of 
the following properties: %s or %s or %s",
-               
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
-        L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn 
%s", serType, msgId, msgFqn);
-        return;
-    }
-    L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", 
serType, msgId, msgFqn);
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries == NULL) {
-        typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
-        hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), 
typeEntries);
-        L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", 
psa->serializers.map, serType);
-    }
-    psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, 
(void*)msgId);
-    if (entry == NULL) {
-        entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
-        entry->svc = svc;
-        entry->fqn = celix_utils_strdup(msgFqn);
-        entry->version = celix_utils_strdup(msgVersion);
-        hashMap_put(typeEntries, (void*)msgId, entry);
-        L_INFO("[PSA_WEBSOCKET_V2] entry added");
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const 
celix_properties_t *props) {
-    pubsub_websocket_admin_t *psa = handle;
-    const char *serType = celix_properties_get(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-    long msgId = celix_properties_getAsLong(props, 
PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
-    if(serType == NULL || msgId == -1) {
-        L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", 
serType, msgId);
-        return;
-    }
-
-    //remove serializer
-    // 1) First find entry and
-    // 2) loop and destroy all topic sender using the serializer and
-    // 3) loop and destroy all topic receivers using the serializer
-    // Note that it is the responsibility of the topology manager to create 
new topic senders/receivers
-
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
-    if(typeEntries != NULL) {
-        psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, 
(void*)msgId);
-        free((void*)entry->fqn);
-        free((void*)entry->version);
-        free(entry);
-
-        // check if there are no remaining serializers for the given type. If 
not, remove all senders and receivers for this type.
-        if(hashMap_size(typeEntries) == 0) {
-            hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, 
serType), true, false);
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-            celixThreadMutex_lock(&psa->topicSenders.mutex);
-            hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
-                pubsub_websocket_topic_sender_t *sender = 
hashMapEntry_getValue(senderEntry);
-                if (sender != NULL && strncmp(serType, 
pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_websocketTopicSender_destroy(sender);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
-            celixThreadMutex_lock(&psa->topicReceivers.mutex);
-            iter = hashMapIterator_construct(psa->topicReceivers.map);
-            while (hashMapIterator_hasNext(&iter)) {
-                hash_map_entry_t *senderEntry = 
hashMapIterator_nextEntry(&iter);
-                pubsub_websocket_topic_receiver_t *receiver = 
hashMapEntry_getValue(senderEntry);
-                if (receiver != NULL && strncmp(serType, 
pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
-                    char *key = hashMapEntry_getKey(senderEntry);
-                    hashMapIterator_remove(&iter);
-                    pubsub_websocketTopicReceiver_destroy(receiver);
-                    free(key);
-                }
-            }
-            celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-        } else {
-            celixThreadRwlock_unlock(&psa->serializers.mutex);
-        }
-    } else {
-        celixThreadRwlock_unlock(&psa->serializers.mutex);
-    }
-}
-
 celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t 
**topicProperties, double *outScore, long *outSerializerSvcId, long 
*outProtocolSvcId) {
     pubsub_websocket_admin_t *psa = handle;
     L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
@@ -297,14 +196,36 @@ celix_status_t 
pubsub_websocketAdmin_matchDiscoveredEndpoint(void *handle, const
     return status;
 }
 
+static pubsub_serializer_handler_t* 
pubsub_websocketAdmin_getSerializationHandler(pubsub_websocket_admin_t* psa, 
long msgSerializationMarkerSvcId) {
+    pubsub_serializer_handler_t* handler = NULL;
+    celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+    handler = hashMap_get(psa->serializationHandlers.map, 
(void*)msgSerializationMarkerSvcId);
+    if (handler == NULL) {
+        handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, 
msgSerializationMarkerSvcId, psa->log);
+        if (handler != NULL) {
+            hashMap_put(psa->serializationHandlers.map, 
(void*)msgSerializationMarkerSvcId, handler);
+        }
+    }
+    celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+    return handler;
+}
+
+
 celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char 
*scope, const char *topic, const celix_properties_t *topicProperties, long 
serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) 
{
     pubsub_websocket_admin_t *psa = handle;
     celix_status_t  status = CELIX_SUCCESS;
 
-    //1) Create TopicSender
-    //2) Store TopicSender
-    //3) Connect existing endpoints
-    //4) set outPublisherEndpoint
+    //1) Get serialization handler
+    //2) Create TopicSender
+    //3) Store TopicSender
+    //4) Connect existing endpoints
+    //5) set outPublisherEndpoint
+
+    pubsub_serializer_handler_t* handler = 
pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic sender without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
 
     celix_properties_t *newEndpoint = NULL;
 
@@ -324,7 +245,7 @@ celix_status_t pubsub_websocketAdmin_setupTopicSender(void 
*handle, const char *
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     pubsub_websocket_topic_sender_t *sender = 
hashMap_get(psa->topicSenders.map, key);
     if (sender == NULL) {
-        sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, 
topic, serType, psa);
+        sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, 
topic, handler, psa);
         if (sender != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, 
PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
@@ -382,9 +303,14 @@ celix_status_t 
pubsub_websocketAdmin_teardownTopicSender(void *handle, const cha
 
 celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const 
char *scope, const char *topic, const celix_properties_t *topicProperties, long 
serializerSvcId, long protocolSvcId, celix_properties_t 
**outSubscriberEndpoint) {
     pubsub_websocket_admin_t *psa = handle;
-
     celix_properties_t *newEndpoint = NULL;
 
+    pubsub_serializer_handler_t* handler = 
pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+    if (handler == NULL) {
+        L_ERROR("Cannot create topic receiver without serialization handler");
+        return CELIX_ILLEGAL_STATE;
+    }
+
     //get serializer type
     const char *serType = NULL;
     celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
@@ -400,7 +326,7 @@ celix_status_t 
pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     pubsub_websocket_topic_receiver_t *receiver = 
hashMap_get(psa->topicReceivers.map, key);
     if (receiver == NULL) {
-        receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, 
scope, topic, topicProperties, serType, psa);
+        receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, 
scope, topic, topicProperties, handler, psa);
         if (receiver != NULL) {
             const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
             newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
@@ -578,57 +504,11 @@ celix_status_t 
pubsub_websocketAdmin_removeDiscoveredEndpoint(void *handle, cons
     return status;
 }
 
-psa_websocket_serializer_entry_t* 
pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char 
*serializationType, uint32_t msgId) {
-    pubsub_websocket_admin_t *psa = handle;
-    psa_websocket_serializer_entry_t *serializer = NULL;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, 
serializationType);
-    if(typeEntries != NULL) {
-        serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
-    }
-
-    return serializer;
-}
-
-void pubsub_websocketAdmin_releaseSerializer(void *handle, 
psa_websocket_serializer_entry_t* serializer) {
-    pubsub_websocket_admin_t *psa = handle;
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const 
char *serializationType, const char *fqn) {
-    pubsub_websocket_admin_t *psa = handle;
-    int64_t id = -1L;
-
-    celixThreadRwlock_readLock(&psa->serializers.mutex);
-    hash_map_t *typeEntries = hashMap_get(psa->serializers.map, 
serializationType);
-    if(typeEntries != NULL) {
-        hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
-        while(hashMapIterator_hasNext(&iterator)) {
-            void *key = hashMapIterator_nextKey(&iterator);
-            psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, 
key);
-            L_WARN("[PSA_WEBSOCKET_V2] 
pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, 
fqn);
-            if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
-                id = (uint32_t)(uintptr_t)key;
-                break;
-            }
-        }
-    } else {
-        L_WARN("[PSA_WEBSOCKET_V2] 
pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", 
serializationType, fqn);
-    }
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
-
-    L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn 
%p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
-    return id;
-}
-
 bool pubsub_websocketAdmin_executeCommand(void *handle, const char 
*commandLine __attribute__((unused)), FILE *out, FILE *errStream 
__attribute__((unused))) {
     pubsub_websocket_admin_t *psa = handle;
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicSenders.mutex);
     hash_map_iterator_t iter = 
hashMapIterator_construct(psa->topicSenders.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -642,11 +522,9 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, 
const char *commandLine
         fprintf(out, "   |- url             = %s\n", url);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
 
     fprintf(out, "\n");
     fprintf(out, "\nTopic Receivers:\n");
-    celixThreadRwlock_writeLock(&psa->serializers.mutex);
     celixThreadMutex_lock(&psa->topicReceivers.mutex);
     iter = hashMapIterator_construct(psa->topicReceivers.map);
     while (hashMapIterator_hasNext(&iter)) {
@@ -677,7 +555,6 @@ bool pubsub_websocketAdmin_executeCommand(void *handle, 
const char *commandLine
         celix_arrayList_destroy(unconnected);
     }
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-    celixThreadRwlock_unlock(&psa->serializers.mutex);
     fprintf(out, "\n");
 
     return true;
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index c93f078..ea997e9 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -68,10 +68,11 @@ struct pubsub_websocket_topic_receiver {
     void *admin;
     char *scope;
     char *topic;
-    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
+    pubsub_serializer_handler_t* serializerHandler;
+
     celix_websocket_service_t sockSvc;
     long svcId;
 
@@ -131,12 +132,12 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
                                                               const char 
*scope,
                                                               const char 
*topic,
                                                               const 
celix_properties_t *topicProperties,
-                                                              const char 
*serType,
+                                                              
pubsub_serializer_handler_t* serializerHandler,
                                                               void *admin) {
     pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
     receiver->ctx = ctx;
     receiver->logHelper = logHelper;
-    receiver->serType = celix_utils_strdup(serType);
+    receiver->serializerHandler = serializerHandler;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
     receiver->admin = admin;
@@ -309,7 +310,6 @@ void 
pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *re
         free(receiver->uri);
         free(receiver->scope);
         free(receiver->topic);
-        free(receiver->serType);
     }
     free(receiver);
 }
@@ -325,7 +325,7 @@ const char* 
pubsub_websocketTopicReceiver_url(pubsub_websocket_topic_receiver_t
 }
 
 const char 
*pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t 
*receiver) {
-    return receiver->serType;
+    return 
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
 }
 
 void 
pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t 
*receiver, celix_array_list_t *connectedUrls, celix_array_list_t 
*unconnectedUrls) {
@@ -451,58 +451,52 @@ static void 
pubsub_websocketTopicReceiver_removeSubscriber(void *handle, void *s
 static inline void 
processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, 
psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, 
const char* payload, size_t payloadSize) {
     //NOTE receiver->subscribers.mutex locked
 
-    int64_t msgTypeId = 
pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, 
receiver->serType, hdr->id);
-
-    if(msgTypeId < 0) {
-        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, 
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-        return;
-    }
-
-    psa_websocket_serializer_entry_t *serializer = 
pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, 
receiver->serType, msgTypeId);
+    uint32_t msgId = 
pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
 
-    if(serializer == NULL) {
-        pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
-        L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, 
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+    if (msgId == 0) {
+        L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
         return;
     }
 
     void *deSerializedMsg = NULL;
-
-    celix_version_t* version = 
celix_version_createVersionFromString(serializer->version);
-    bool validVersion = psa_websocket_checkVersion(version, hdr);
-    celix_version_destroy(version);
+    bool validVersion = 
pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, 
hdr->major, hdr->minor);
     if (validVersion) {
         struct iovec deSerializeBuffer;
         deSerializeBuffer.iov_base = (void *)payload;
         deSerializeBuffer.iov_len  = payloadSize;
-        celix_status_t status = 
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, 
&deSerializedMsg);
-
+        celix_status_t status = 
pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, 
hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
         if (status == CELIX_SUCCESS) {
             hash_map_iterator_t iter = 
hashMapIterator_construct(entry->subscriberServices);
             bool release = true;
             while (hashMapIterator_hasNext(&iter)) {
                 pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                svc->receive(svc->handle, serializer->fqn, msgTypeId, 
deSerializedMsg, NULL, &release);
+                svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, 
NULL, &release);
                 if (!release && hashMapIterator_hasNext(&iter)) {
                     //receive function has taken ownership and still more 
receive function to come ..
                     //deserialize again for new message
-                    status = 
serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, 
&deSerializedMsg);
+                    status = 
pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, 
hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
                     if (status != CELIX_SUCCESS) {
-                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type 
%s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" 
: receiver->scope, receiver->topic);
+                        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type 
%s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
                         break;
                     }
                     release = true;
                 }
             }
             if (release) {
-                serializer->svc->freeDeserializedMsg(serializer->svc->handle, 
deSerializedMsg);
+                
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, 
msgId, deSerializedMsg);
             }
         } else {
-            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
+            L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for 
scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : 
receiver->scope, receiver->topic);
         }
+    } else {
+        L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, 
version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+               hdr->id,
+               
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+               (int)hdr->major,
+               (int)hdr->minor,
+               
pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId),
+               
pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, 
msgId));
     }
-
-    pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
 }
 
 static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, 
const char *msg, size_t msgSize) {
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 55d5255..f5edda5 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -20,8 +20,9 @@
 #ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
 #define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
 
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
 #include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_websocket_topic_receiver 
pubsub_websocket_topic_receiver_t;
 
@@ -30,7 +31,7 @@ pubsub_websocket_topic_receiver_t* 
pubsub_websocketTopicReceiver_create(celix_bu
         const char *scope,
         const char *topic,
         const celix_properties_t *topicProperties,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin);
 void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t 
*receiver);
 
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 98a1ad7..28a8af1 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -52,10 +52,13 @@ struct pubsub_websocket_topic_sender {
     void *admin;
     char *scope;
     char *topic;
-    char *serType;
     char scopeAndTopicFilter[5];
     char *uri;
 
+    pubsub_serializer_handler_t* serializerHandler;
+
+    int seqNr; //atomic
+
     celix_websocket_service_t websockSvc;
     long websockSvcId;
     struct mg_connection *sockConnection;
@@ -71,16 +74,10 @@ struct pubsub_websocket_topic_sender {
     } boundedServices;
 };
 
-typedef struct psa_websocket_send_msg_entry {
-    pubsub_websocket_msg_header_t header; //partially filled header (only 
seqnr and time needs to be updated per send)
-    uint32_t type; //msg type id (hash of fqn)
-} psa_websocket_send_msg_entry_t;
-
 typedef struct psa_websocket_bounded_service_entry {
     pubsub_websocket_topic_sender_t *parent;
     pubsub_publisher_t service;
     long bndId;
-    hash_map_t *msgEntries; //key = msg type id, value = 
psa_websocket_send_msg_entry_t
     int getCount;
 } psa_websocket_bounded_service_entry_t;
 
@@ -99,18 +96,12 @@ pubsub_websocket_topic_sender_t* 
pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin) {
     pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
     sender->ctx = ctx;
     sender->logHelper = logHelper;
-    sender->serType = celix_utils_strdup(serType);
-
-    if(sender->serType == NULL) {
-        L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
-        free(sender);
-        return NULL;
-    }
+    sender->serializerHandler = serializerHandler;
 
     psa_websocket_setScopeAndTopicFilter(scope, topic, 
sender->scopeAndTopicFilter);
     sender->uri = psa_websocket_createURI(scope, topic);
@@ -174,17 +165,7 @@ void 
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_websocket_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
-            if (entry != NULL) {
-                hash_map_iterator_t iter2 = 
hashMapIterator_construct(entry->msgEntries);
-                while (hashMapIterator_hasNext(&iter2)) {
-                    psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter2);
-                    free(msgEntry);
-
-                }
-                hashMap_destroy(entry->msgEntries, false, false);
-
-                free(entry);
-            }
+            free(entry);
         }
         hashMap_destroy(sender->boundedServices.map, false, false);
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -198,7 +179,6 @@ void 
pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender
         }
         free(sender->topic);
         free(sender->uri);
-        free(sender->serType);
         free(sender);
     }
 }
@@ -216,16 +196,17 @@ const char* 
pubsub_websocketTopicSender_url(pubsub_websocket_topic_sender_t *sen
 }
 
 const char* 
pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t 
*sender) {
-    return sender->serType;
+    return 
pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
 }
 
 static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* 
msgType, unsigned int* msgTypeId) {
     psa_websocket_bounded_service_entry_t *entry = 
(psa_websocket_bounded_service_entry_t *) handle;
-    int64_t rc = 
pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, 
entry->parent->serType, msgType);
-    if(rc >= 0) {
-        *msgTypeId = (unsigned int)rc;
+    uint32_t msgId = 
pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+    if (msgId != 0) {
+        *msgTypeId = msgId;
+        return 0;
     }
-    return 0;
+    return -1;
 }
 
 static void* psa_websocket_getPublisherService(void *handle, const 
celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties 
__attribute__((unused))) {
@@ -241,7 +222,6 @@ static void* psa_websocket_getPublisherService(void 
*handle, const celix_bundle_
         entry->getCount = 1;
         entry->parent = sender;
         entry->bndId = bndId;
-        entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
         entry->service.handle = entry;
         entry->service.localMsgTypeIdForMsgType = 
psa_websocket_localMsgTypeIdForMsgType;
         entry->service.send = psa_websocket_topicPublicationSend;
@@ -264,60 +244,40 @@ static void psa_websocket_ungetPublisherService(void 
*handle, const celix_bundle
     if (entry != NULL && entry->getCount == 0) {
         //free entry
         hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
-        hash_map_iterator_t iter = 
hashMapIterator_construct(entry->msgEntries);
-        while (hashMapIterator_hasNext(&iter)) {
-            psa_websocket_send_msg_entry_t *msgEntry = 
hashMapIterator_nextValue(&iter);
-            free(msgEntry);
-        }
-        hashMap_destroy(entry->msgEntries, false, false);
-
         free(entry);
     }
     celixThreadMutex_unlock(&sender->boundedServices.mutex);
 }
 
 static int psa_websocket_topicPublicationSend(void* handle, unsigned int 
msgTypeId, const void *inMsg, celix_properties_t *metadata) {
-    int status = CELIX_SERVICE_EXCEPTION;
     psa_websocket_bounded_service_entry_t *bound = handle;
     pubsub_websocket_topic_sender_t *sender = bound->parent;
 
-    psa_websocket_serializer_entry_t *serializer = 
pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, 
sender->serType, msgTypeId);
+    const char* msgFqn;
+    int majorVersion;
+    int minorVersion;
+    celix_status_t status = 
pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, 
&msgFqn, &majorVersion, &minorVersion);
 
-    if(serializer == NULL) {
-        pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-        L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with 
serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, 
sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
-        return CELIX_SERVICE_EXCEPTION;
-    }
-    
-    psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, 
(void *) (uintptr_t) (msgTypeId));
-
-    if(entry == NULL) {
-        entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
-        entry->type = msgTypeId;
-        entry->header.id = serializer->fqn;
-        celix_version_t* version = 
celix_version_createVersionFromString(serializer->version);
-        entry->header.major = (uint8_t)celix_version_getMajor(version);
-        entry->header.minor = (uint8_t)celix_version_getMinor(version);
-        entry->header.seqNr = 0;
-        celix_version_destroy(version);
-        hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", 
msgTypeId,
+               
pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+        return status;
     }
 
     if (sender->sockConnection != NULL) {
         delay_first_send_for_late_joiners(sender);
         size_t serializedOutputLen = 0;
         struct iovec* serializedOutput = NULL;
-        status = serializer->svc->serialize(serializer->svc->handle, inMsg, 
&serializedOutput, &serializedOutputLen);
-
+        status = pubsub_serializerHandler_serialize(sender->serializerHandler, 
msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
         if (status == CELIX_SUCCESS /*ser ok*/) {
             json_error_t jsError;
 
             json_t *jsMsg = json_object();
-            json_object_set_new_nocheck(jsMsg, "id", 
json_string(entry->header.id));
-            json_object_set_new_nocheck(jsMsg, "major", 
json_integer(entry->header.major));
-            json_object_set_new_nocheck(jsMsg, "minor", 
json_integer(entry->header.minor));
-            uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, 
__ATOMIC_RELAXED);
+            json_object_set_new_nocheck(jsMsg, "id", json_string(msgFqn));
+            json_object_set_new_nocheck(jsMsg, "major", 
json_integer(majorVersion));
+            json_object_set_new_nocheck(jsMsg, "minor", 
json_integer(minorVersion));
+            uint32_t seqNr = __atomic_fetch_add(&sender->seqNr, 1, 
__ATOMIC_RELAXED);
             json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
 
             json_t *jsData;
@@ -338,17 +298,15 @@ static int psa_websocket_topicPublicationSend(void* 
handle, unsigned int msgType
             }
 
             json_decref(jsMsg); //Decrease ref count means freeing the object
-            serializer->svc->freeSerializedMsg(serializer->svc->handle, 
serializedOutput, serializedOutputLen);
+            
pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, 
msgTypeId, serializedOutput, serializedOutputLen);
         } else {
-            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for 
scope/topic %s/%s",
-                   entry->header.id, sender->scope == NULL ? "(null)" : 
sender->scope, sender->topic);
+            L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %u for 
scope/topic %s/%s",
+                   msgTypeId, sender->scope == NULL ? "(null)" : 
sender->scope, sender->topic);
         }
     } else { // when (sender->sockConnection == NULL) we dont have a client, 
but we do have a valid entry
        status = CELIX_SUCCESS; // Not an error, just nothing to do
     }
 
-    pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-
     return status;
 }
 
diff --git 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 8f8cebf..6b42500 100644
--- 
a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
+++ 
b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -22,6 +22,7 @@
 
 #include "celix_bundle_context.h"
 #include "pubsub_admin_metrics.h"
+#include "pubsub_serializer_handler.h"
 
 typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
 
@@ -30,7 +31,7 @@ pubsub_websocket_topic_sender_t* 
pubsub_websocketTopicSender_create(
         celix_log_helper_t *logHelper,
         const char *scope,
         const char *topic,
-        const char *serType,
+        pubsub_serializer_handler_t* serializerHandler,
         void *admin);
 void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t 
*sender);
 
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c 
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index a00ce34..9d2070f 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -452,6 +452,10 @@ static inline void 
processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *rec
     int updateSerError = 0;
 
     const char* msgFqn = 
pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, 
message->header.msgId);
+    if (msgFqn == NULL) {
+        L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+        return;
+    }
 
     void *deserializedMsg = NULL;
     bool validVersion = 
pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, 
message->header.msgId, message->header.msgMajorVersion, 
message->header.msgMinorVersion);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c 
b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index 8ae4489..afc8e54 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -498,6 +498,7 @@ static int psa_zmq_topicPublicationSend(void* handle, 
unsigned int msgTypeId, co
     pubsub_zmq_topic_sender_t *sender = bound->parent;
     bool monitor = sender->metricsEnabled;
 
+    //TODO remove use of entry, so that one less lock is needed and drop 
metrics stuff
     psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, 
(void*)(uintptr_t)(msgTypeId));
 
     //metrics updates
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c 
b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index 418b4b5..7d3d0f3 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -42,7 +42,7 @@ typedef struct pubsub_serialization_service_entry {
     const celix_properties_t *properties;
     uint32_t msgId;
     celix_version_t* msgVersion;
-    char* msgFqn;
+    const char* msgFqn;
     pubsub_message_serialization_service_t* svc;
 } pubsub_serialization_service_entry_t;
 
@@ -185,7 +185,6 @@ static void pubsub_serializerHandler_destroyCallback(void* 
data) {
         celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
         for (int i = 0; i < celix_arrayList_size(entries); ++i) {
             pubsub_serialization_service_entry_t* entry = 
celix_arrayList_get(entries, i);
-            free(entry->msgFqn);
             celix_version_destroy(entry->msgVersion);
             free(entry);
         }
@@ -239,6 +238,12 @@ void 
pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
     }
 
     if (valid) {
+        char* fqn = hashMap_get(handler->msgFullyQualifiedNames, 
(void*)(uintptr_t)msgId);
+        if (fqn == NULL) {
+            fqn = celix_utils_strdup(msgFqn);
+            hashMap_put(handler->msgFullyQualifiedNames, 
(void*)(uintptr_t)msgId, fqn);
+        }
+
         celix_array_list_t *entries = 
hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
         if (entries == NULL) {
             entries = celix_arrayList_create();
@@ -246,7 +251,7 @@ void 
pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         pubsub_serialization_service_entry_t *entry = calloc(1, 
sizeof(*entry));
         entry->svcId = svcId;
         entry->properties = svcProperties;
-        entry->msgFqn = celix_utils_strdup(msgFqn);
+        entry->msgFqn = fqn;
         entry->msgId = msgId;
         entry->msgVersion = msgVersion;
         entry->svc = svc;
@@ -258,11 +263,6 @@ void 
pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_
         celix_version_destroy(msgVersion);
     }
 
-    char* fqn = hashMap_get(handler->msgFullyQualifiedNames, 
(void*)(uintptr_t)msgId);
-    if (fqn == NULL) {
-        hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, 
celix_utils_strdup(msgFqn));
-    }
-
     celixThreadRwlock_unlock(&handler->lock);
 }
 
@@ -288,7 +288,6 @@ void 
pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handl
             }
         }
         if (found != NULL) {
-            free(found->msgFqn);
             celix_version_destroy(found->msgVersion);
             free(found);
         }

Reply via email to