CELIX-408: Major refactoring of the pubsub serializer design and usage. example 
of udpmc and zmq seesm to be working. combi and zmq multicast are not stable 
yet.

- The serializer is refactored to miminize to needed callback directly to the 
services. The serializer is now a two step approach.
  The serializer service can be used to create a map os msg serializers.
  And the msg serializer are serializer structs (with function ptrs) to 
serialize a specific msg
- Where feasible replace _pt types with _t types. Removing the pointer in the 
typedef makes it possible to add const info and sizeof calls of the typedef


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/7efe4331
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/7efe4331
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/7efe4331

Branch: refs/heads/develop
Commit: 7efe4331aafdfea1df8bd181c136f20acaf52b49
Parents: 97df926
Author: Pepijn Noltes <[email protected]>
Authored: Tue Apr 11 21:19:10 2017 +0200
Committer: Pepijn Noltes <[email protected]>
Committed: Tue Apr 11 21:19:10 2017 +0200

----------------------------------------------------------------------
 cmake/cmake_celix/BundlePackaging.cmake         |   2 +-
 dfi/private/src/json_serializer.c               |   8 +-
 dfi/public/include/json_serializer.h            |   4 +-
 pubsub/pubsub_admin_udp_mc/CMakeLists.txt       |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   8 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 173 +++++++-------
 .../private/src/topic_subscription.c            | 158 +++++++------
 pubsub/pubsub_admin_zmq/CMakeLists.txt          |   1 -
 .../private/include/pubsub_admin_impl.h         |   6 +-
 .../include/pubsub_publish_service_private.h    |   6 +-
 .../private/include/topic_subscription.h        |   6 +-
 .../private/src/pubsub_admin_impl.c             |   4 +-
 .../private/src/topic_publication.c             | 137 ++++++-----
 .../private/src/topic_subscription.c            | 184 ++++++---------
 .../public/include/dyn_msg_utils.h              |  39 ----
 .../pubsub_common/public/include/pubsub_admin.h |   4 +-
 .../public/include/pubsub_serializer.h          |  47 ++--
 pubsub/pubsub_common/public/src/dyn_msg_utils.c | 160 -------------
 pubsub/pubsub_serializer_json/CMakeLists.txt    |   1 -
 .../private/include/pubsub_serializer_impl.h    |  35 ++-
 .../private/src/ps_activator.c                  |  20 +-
 .../private/src/pubsub_serializer_impl.c        | 234 +++++++++++++++----
 .../private/src/pubsub_topology_manager.c       |   6 +-
 pubsub/test/msg_descriptors/msg.descriptor      |   2 +-
 pubsub/test/test/tst_activator.cpp              |   9 +-
 28 files changed, 582 insertions(+), 689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/cmake/cmake_celix/BundlePackaging.cmake
----------------------------------------------------------------------
diff --git a/cmake/cmake_celix/BundlePackaging.cmake 
b/cmake/cmake_celix/BundlePackaging.cmake
index ded1ca5..7eb42fa 100644
--- a/cmake/cmake_celix/BundlePackaging.cmake
+++ b/cmake/cmake_celix/BundlePackaging.cmake
@@ -319,7 +319,7 @@ function(bundle_libs)
             if (ADD_TO_MANIFEST)
                 list(APPEND LIBS "$<TARGET_SONAME_FILE_NAME:${LIB}>")
             endif()
-                list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on 
$<TARGET_FILE:${LIB}>.
+            list(APPEND DEPS "${OUT}") #NOTE depending on ${OUT} not on 
$<TARGET_FILE:${LIB}>.
         endif()
 
         get_target_property(IS_LIB ${BUNDLE} "BUNDLE_TARGET_IS_LIB")

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/private/src/json_serializer.c
----------------------------------------------------------------------
diff --git a/dfi/private/src/json_serializer.c 
b/dfi/private/src/json_serializer.c
index 0c06998..c1cd339 100644
--- a/dfi/private/src/json_serializer.c
+++ b/dfi/private/src/json_serializer.c
@@ -280,7 +280,7 @@ static int jsonSerializer_parseSequence(dyn_type *seq, 
json_t *array, void *seqL
     return status;
 }
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output) {
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output) 
{
     int status = OK;
 
     json_t *root = NULL;
@@ -294,11 +294,11 @@ int jsonSerializer_serialize(dyn_type *type, void *input, 
char **output) {
     return status;
 }
 
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out) {
-    return jsonSerializer_writeAny(type, input, out);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t 
**out) {
+    return jsonSerializer_writeAny(type, (void*)input /*TODO update static 
function to take const void**/, out);
 }
 
-static int jsonSerializer_writeAny(dyn_type *type, void *input, json_t **out) {
+static int jsonSerializer_writeAny(dyn_type *type, void* input, json_t **out) {
     int status = OK;
 
     int descriptor = dynType_descriptorType(type);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/dfi/public/include/json_serializer.h
----------------------------------------------------------------------
diff --git a/dfi/public/include/json_serializer.h 
b/dfi/public/include/json_serializer.h
index c785b01..2f91f2b 100644
--- a/dfi/public/include/json_serializer.h
+++ b/dfi/public/include/json_serializer.h
@@ -31,7 +31,7 @@ DFI_SETUP_LOG_HEADER(jsonSerializer);
 int jsonSerializer_deserialize(dyn_type *type, const char *input, void 
**result);
 int jsonSerializer_deserializeJson(dyn_type *type, json_t *input, void 
**result);
 
-int jsonSerializer_serialize(dyn_type *type, void *input, char **output);
-int jsonSerializer_serializeJson(dyn_type *type, void *input, json_t **out);
+int jsonSerializer_serialize(dyn_type *type, const void* input, char **output);
+int jsonSerializer_serializeJson(dyn_type *type, const void* input, json_t 
**out);
 
 #endif

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt 
b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index ce32db0..1ac0c2d 100644
--- a/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -35,7 +35,6 @@ add_bundle(org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        private/src/topic_subscription.c
        private/src/topic_publication.c
        private/src/large_udp.c
-       ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
        ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
        ${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
 )

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
index 9eddf15..89e6547 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_admin_impl.h
@@ -33,7 +33,7 @@
 
 struct pubsub_admin {
 
-       pubsub_serializer_service_pt serializerSvc;
+       pubsub_serializer_service_t* serializerSvc;
 
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
@@ -73,7 +73,7 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 81690b8..57d942a 100644
--- 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -39,17 +39,17 @@ typedef struct pubsub_udp_msg {
     struct pubsub_msg_header header;
     unsigned int payloadSize;
     char payload[];
-} *pubsub_udp_msg_pt;
+} pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* 
bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h 
b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
index 36c902e..1535ae5 100644
--- a/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_udp_mc/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* 
topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* 
topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -49,8 +49,8 @@ celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index 6e9f4e8..ebfe3e6 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -645,7 +645,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt 
admin, pubsub_endpoin
        return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
        admin->serializerSvc = serializerSvc;
 
@@ -673,7 +673,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt 
admin, pubsub_serialize
        return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
        admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index aa3faf0..be0a433 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -37,7 +37,6 @@
 #include "array_list.h"
 #include "celixbool.h"
 #include "service_registration.h"
-#include "dyn_msg_utils.h"
 #include "utils.h"
 #include "service_factory.h"
 #include "version.h"
@@ -59,7 +58,7 @@ struct topic_publication {
        hash_map_pt boundServices; //<bundle_pt,bound_service>
        celix_thread_mutex_t tp_lock;
        struct sockaddr_in destAddr;
-       pubsub_serializer_service_pt serializerSvc;
+       pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -68,27 +67,27 @@ typedef struct publish_bundle_bound_service {
        bundle_pt bundle;
     char *scope;
        char *topic;
-       hash_map_pt msgTypes;
        unsigned short getCount;
        celix_thread_mutex_t mp_lock;
        bool mp_send_in_progress;
        array_list_pt mp_parts;
        largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
+       pubsub_msg_serializer_map_t* map;
+} publish_bundle_bound_service_t;
 
 typedef struct pubsub_msg{
        pubsub_msg_header_pt header;
        char* payload;
        int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
+static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg);
 
@@ -98,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* 
bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -136,7 +135,7 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
        hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
        while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
+               publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(iter);
                pubsub_destroyPublishBundleBoundService(bound);
        }
        hashMapIterator_destroy(iter);
@@ -223,41 +222,49 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&(pub->tp_lock));
 
-       pub->serializerSvc = serializerSvc;
+    //clear old serializer
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+                       bound->map = NULL;
+        }
+    }
 
-       hash_map_iterator_pt bs_iter = 
hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(bs_iter)){
-               publish_bundle_bound_service_pt boundSvc = 
(publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-               if (hashMap_size(boundSvc->msgTypes) == 0){
-                       
pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, 
boundSvc->msgTypes, boundSvc->bundle);
-               }
+    //setup new serializer
+       pub->serializerSvc = serializerSvc;
+       hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+       while (hashMapIterator_hasNext(&iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+               bundle_pt bundle = hashMapEntry_getKey(entry);
+               publish_bundle_bound_service_t* bound = 
hashMapEntry_getValue(entry);
+               
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
        }
-       hashMapIterator_destroy(bs_iter);
 
        celixThreadMutex_unlock(&(pub->tp_lock));
 
        return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* svc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&(pub->tp_lock));
-
-       hash_map_iterator_pt bs_iter = 
hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(bs_iter)){
-               publish_bundle_bound_service_pt boundSvc = 
(publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-               
pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, 
boundSvc->msgTypes);
-       }
-       hashMapIterator_destroy(bs_iter);
-
+    if (pub->serializerSvc == svc) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+            
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+                       bound->map = NULL;
+        }
+    }
        pub->serializerSvc = NULL;
-
        celixThreadMutex_unlock(&(pub->tp_lock));
 
        return status;
@@ -275,18 +282,18 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound==NULL){
-               bound = pubsub_createPublishBundleBoundService(publish,bundle);
-               if(bound!=NULL){
-                       hashMap_put(publish->boundServices,bundle,bound);
+       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices, bundle);
+       if (bound == NULL) {
+               bound = pubsub_createPublishBundleBoundService(publish, bundle);
+               if (bound != NULL) {
+                       hashMap_put(publish->boundServices, bundle, bound);
                }
        }
-       else{
+       else {
                bound->getCount++;
        }
 
-       if(bound!=NULL){
+       if (bound != NULL) {
                *service = bound->service;
        }
 
@@ -301,17 +308,16 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound!=NULL){
+       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices, bundle);
+       if (bound != NULL) {
 
                bound->getCount--;
-               if(bound->getCount==0){
+               if (bound->getCount == 0) {
                        pubsub_destroyPublishBundleBoundService(bound);
                        hashMap_remove(publish->boundServices,bundle);
                }
-
        }
-       else{
+       else {
                long bundleId = -1;
                bundle_getBundleId(bundle,&bundleId);
                printf("TP: Unexpected ungetService call for bundle %ld.\n", 
bundleId);
@@ -325,12 +331,11 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
        return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, 
pubsub_msg_pt msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
        const int iovec_len = 3; // header + size + payload
        bool ret = true;
-       pubsub_udp_msg_pt udpMsg;
 
-       int compiledMsgSize = sizeof(*udpMsg) + msg->payloadSize;
+       int compiledMsgSize = sizeof(pubsub_udp_msg_t) + msg->payloadSize;
 
        struct iovec msg_iovec[iovec_len];
        msg_iovec[0].iov_base = msg->header;
@@ -348,51 +353,51 @@ static bool 
send_pubsub_msg(publish_bundle_bound_service_pt bound, pubsub_msg_pt
            ret = false;
        }
 
-       //free(udpMsg);
        if(releaseCallback) {
            releaseCallback->release(msg->payload, bound);
        }
        return ret;
-
 }
 
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
     int status = 0;
-    publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) 
handle;
+    publish_bundle_bound_service_t* bound = handle;
 
     celixThreadMutex_lock(&(bound->parent->tp_lock));
     celixThreadMutex_lock(&(bound->mp_lock));
 
-    //TODO //FIXME -> should use pointer to int as identifier, can be many 
pointers to int ....
-    printf("TODO FIX usage of msg id's in the serializer hashmap. This seems 
wrongly based on pointers to uints!!!!\n");
-    pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
-    int major=0, minor=0;
+    pubsub_msg_serializer_t *msgSer = NULL;
+    if (bound->map != NULL) {
+        msgSer = hashMap_get(bound->map->serializers, (void 
*)(uintptr_t)msgTypeId);
+    }
 
-    if (msgType != NULL && bound->parent->serializerSvc != NULL) {
+    if (bound->map == NULL) {
+        printf("TP: Serializer is not set!\n");
+    } else if (msgSer == NULL ){
+        printf("TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
+    }
 
-       version_pt msgVersion = 
bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer,
 msgType);
+    int major=0, minor=0;
 
+    if (msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
-
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
                msg_hdr->type = msgTypeId;
-               if (msgVersion != NULL){
-                       version_getMajor(msgVersion, &major);
-                       version_getMinor(msgVersion, &minor);
+               if (msgSer->msgVersion != NULL){
+                       version_getMajor(msgSer->msgVersion, &major);
+                       version_getMinor(msgSer->msgVersion, &minor);
                        msg_hdr->major = major;
                        msg_hdr->minor = minor;
                }
 
-               void* serializedOutput = NULL;
-               int serializedOutputLen = 0;
-               
bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer,
 msgType, msg, &serializedOutput, &serializedOutputLen);
+               char* serializedOutput = NULL;
+               size_t serializedOutputLen = 0;
+        msgSer->serialize(msgSer->handle, msg, &serializedOutput, 
&serializedOutputLen);
 
-               pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+               pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
                msg->header = msg_hdr;
-               msg->payload = (char *) serializedOutput;
+               msg->payload = serializedOutput;
                msg->payloadSize = serializedOutputLen;
 
                if(send_pubsub_msg(bound, msg,true, NULL) == false) {
@@ -403,11 +408,7 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
                free(serializedOutput);
 
     } else {
-               if (bound->parent->serializerSvc == NULL) {
-                       printf("TP: Serializer is not set!\n");
-               } else {
-                       printf("TP: Message %u not supported.\n",msgTypeId);
-               }
+        printf("TP: Message %u not supported.\n",msgTypeId);
         status=-1;
     }
 
@@ -430,9 +431,9 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 
 }
 
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle) {
 
-       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+       publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
                bound->service = calloc(1, sizeof(*bound->service));
@@ -445,7 +446,6 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                bound->getCount = 1;
                bound->mp_send_in_progress = false;
                celixThreadMutex_create(&bound->mp_lock,NULL);
-               bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, 
NULL); //<int* (msgId),pubsub_message_type>
                arrayList_create(&bound->mp_parts);
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -457,10 +457,10 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                bound->service->send = pubsub_topicPublicationSend;
                bound->service->sendMultipart = NULL;  //Multipart not 
supported (jet) for UDP
 
-               if (tp->serializerSvc != NULL){
-                       
tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, 
bound->msgTypes,bound->bundle);
+        //TODO check if lock on tp is needed? (e.g. is lock already done by 
caller?)
+               if (tp->serializerSvc != NULL) {
+            tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, 
bundle, &bound->map);
                }
-
        }
        else
        {
@@ -474,30 +474,33 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
        return bound;
 }
 
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc) {
 
        celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if(boundSvc->service != NULL){
+       if (boundSvc->service != NULL) {
                free(boundSvc->service);
        }
 
-       if(boundSvc->msgTypes != NULL){
-               if (boundSvc->parent->serializerSvc != NULL){
-                       
boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer,
 boundSvc->msgTypes);
-               }
-               hashMap_destroy(boundSvc->msgTypes,false,false);
-       }
+    //TODO check if lock on parent is needed, e.g. does the caller already 
lock?
+    if (boundSvc->map != NULL) {
+        if (boundSvc->parent->serializerSvc == NULL) {
+            printf("TP: Cannot destroy pubsub msg serializer map. No serliazer 
service\n");
+        } else {
+            
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle,
 boundSvc->map);
+            boundSvc->map = NULL;
+        }
+    }
 
-       if(boundSvc->mp_parts!=NULL){
+       if (boundSvc->mp_parts!=NULL) {
                arrayList_destroy(boundSvc->mp_parts);
        }
 
-    if(boundSvc->scope!=NULL){
+    if (boundSvc->scope!=NULL) {
         free(boundSvc->scope);
     }
 
-    if(boundSvc->topic!=NULL){
+    if (boundSvc->topic!=NULL) {
                free(boundSvc->topic);
        }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 91bce9f..da23b21 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -49,7 +49,6 @@
 #include "topic_subscription.h"
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
@@ -68,11 +67,15 @@ struct topic_subscription{
        celix_thread_mutex_t ts_lock;
        bundle_context_pt context;
        int topicEpollFd; // EPOLL filedescriptor where the sockets are 
registered.
-       hash_map_pt servicesMap; // key = service, value = msg types map
+
+    //NOTE. using a service ptr can be dangerous, because pointer can be 
reused.
+    //ensuring that pointer are removed before new (refurbish) pionter comes 
along is crucial!
+       hash_map_pt msgSerializersMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
+
        hash_map_pt socketMap; // key = URL, value = listen-socket
        unsigned int nrSubscribers;
        largeUdp_pt largeUdpHandle;
-       pubsub_serializer_service_pt serializerSvc;
+       pubsub_serializer_service_t* serializerSvc;
 
 };
 
@@ -94,7 +97,7 @@ static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_pt serializer, char* scope, char* 
topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* 
topic,topic_subscription_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
        topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
@@ -115,7 +118,7 @@ celix_status_t pubsub_topicSubscriptionCreate(char* 
ifIp,bundle_context_pt bundl
 
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
        ts->socketMap =  hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
        ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -161,7 +164,7 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->servicesMap,false,false);
+       hashMap_destroy(ts->msgSerializersMap,false,false);
 
        hashMap_destroy(ts->socketMap,false,false);
        largeUdp_destroy(ts->largeUdpHandle);
@@ -391,7 +394,7 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
@@ -403,50 +406,39 @@ celix_status_t 
pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
        return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
-       hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-       while(hashMapIterator_hasNext(svc_iter)){
-               hash_map_pt msgTypes = (hash_map_pt) 
hashMapIterator_nextValue(svc_iter);
-               if (hashMap_size(msgTypes) > 0){
-                       
ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+       if (ts->serializerSvc == serializerSvc) { //only act if svc removed is 
services used
+               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializersMap);
+               while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_map_t* map = 
hashMapIterator_nextValue(&iter);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
                }
+               ts->serializerSvc = NULL;
        }
-       hashMapIterator_destroy(svc_iter);
-
-       ts->serializerSvc = NULL;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
 }
 
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void* svc){
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->servicesMap, service)) {
-               hash_map_pt msgTypes = hashMap_create(uintHash, NULL, 
uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+       if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
                bundle_pt bundle = NULL;
                serviceReference_getBundle(reference, &bundle);
 
-               if (ts->serializerSvc != NULL){
-                       
ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, 
msgTypes,bundle);
-               }
-
-               if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not 
filled, the service is an unsupported subscriber
-                       hashMap_destroy(msgTypes,false,false);
-                       printf("TS: Unsupported subscriber!\n");
-               }
-               else{
-                       hashMap_put(ts->servicesMap, service, msgTypes);
+               if (ts->serializerSvc != NULL) {
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
+            if (map != NULL) {
+                hashMap_put(ts->msgSerializersMap, svc, map);
+            }
                }
-
        }
        celixThreadMutex_unlock(&ts->ts_lock);
        printf("TS: New subscriber registered.\n");
@@ -454,18 +446,16 @@ static celix_status_t topicsub_subscriberTracked(void * 
handle, service_referenc
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void* svc){
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
-       celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->servicesMap, service)) {
-               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-               if(msgTypes!=NULL){
-                       if (ts->serializerSvc != NULL){
-                               
ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-                       }
-                       hashMap_destroy(msgTypes,false,false);
+
+    celixThreadMutex_lock(&ts->ts_lock);
+       if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
+               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializersMap, svc);
+               if (ts->serializerSvc != NULL){
+                       
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
                }
        }
        celixThreadMutex_unlock(&ts->ts_lock);
@@ -475,59 +465,51 @@ static celix_status_t topicsub_subscriberUntracked(void * 
handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_pt msg){
+static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
 
-       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-       while (hashMapIterator_hasNext(iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializersMap);
+       celixThreadMutex_lock(&sub->ts_lock);
+       while (hashMapIterator_hasNext(&iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
                pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+               pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-               pubsub_message_type *msgType = 
hashMap_get(msgTypes,&(msg->header.type));
+               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void *)(uintptr_t )msg->header.type);
 
-               if (msgType == NULL) {
+               if (msgSer == NULL) {
                        printf("TS: Primary message %d not supported. NOT 
receiving any part of the whole message.\n",msg->header.type);
-               }
-               else if (sub->serializerSvc == NULL){
-                       printf("TS: No active serializer service found!\n");
-               }
-               else{
+               } else {
                        void *msgInst = NULL;
-                       char *name = 
sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-                       version_pt msgVersion = 
sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-                       bool validVersion = 
checkVersion(msgVersion,&msg->header);
-
+                       bool validVersion = checkVersion(msgSer->msgVersion, 
&msg->header);
                        if(validVersion){
-                               celix_status_t status = 
sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const 
void *) msg->payload, &msgInst);
-
+                celix_status_t status = msgSer->deserialize(msgSer->handle, 
msg->payload, 0, &msgInst);
                                if (status == CELIX_SUCCESS) {
                                        bool release = true;
                                        pubsub_multipart_callbacks_t 
mp_callbacks;
-                                       mp_callbacks.handle = sub;
+                                       mp_callbacks.handle = map;
                                        mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
                                        mp_callbacks.getMultipart = NULL;
 
-                                       subsvc->receive(subsvc->handle, name, 
msg->header.type, msgInst, &mp_callbacks, &release);
-                                       if(release){
-                                               
sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+                                       if (release) {
+                        msgSer->freeMsg(msgSer->handle, msgInst);
                                        }
                                }
                                else{
-                                       printf("TS: Cannot deserialize msgType 
%s.\n",name);
+                                       printf("TS: Cannot deserialize msgType 
%s.\n", msgSer->msgName);
                                }
 
                        }
-                       else{
+                       else {
                                int major=0,minor=0;
-                               version_getMajor(msgVersion,&major);
-                               version_getMinor(msgVersion,&minor);
-                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",name,major,minor,msg->header.major,msg->header.minor);
+                               version_getMajor(msgSer->msgVersion, &major);
+                               version_getMinor(msgSer->msgVersion, &minor);
+                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",
+                       msgSer->msgName, major, minor, msg->header.major, 
msg->header.minor);
                        }
-
                }
        }
-       hashMapIterator_destroy(iter);
+       celixThreadMutex_unlock(&sub->ts_lock);
 }
 
 static void* udp_recv_thread_func(void * arg) {
@@ -555,7 +537,7 @@ static void* udp_recv_thread_func(void * arg) {
             unsigned int size;
             if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, 
&index, &size) == true) {
                 // Handle data
-                pubsub_udp_msg_pt udpMsg = NULL;
+                pubsub_udp_msg_t* udpMsg = NULL;
                 if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, 
size) != 0) {
                        printf("TS: ERROR largeUdp_read with index %d\n", 
index);
                        continue;
@@ -577,19 +559,19 @@ static void* udp_recv_thread_func(void * arg) {
 }
 
 
-static void sigusr1_sighandler(int signo){
+static void sigusr1_sighandler(int signo) {
        printf("TS: Topic subscription being shut down...\n");
        return;
 }
 
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
        bool check=false;
        int major=0,minor=0;
 
-       if(msgVersion!=NULL){
+       if (msgVersion!=NULL) {
                version_getMajor(msgVersion,&major);
                version_getMinor(msgVersion,&minor);
-               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
+               if (hdr->major==((unsigned char)major)) { /* Different major 
means incompatible */
                        check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
                }
        }
@@ -597,8 +579,24 @@ static bool checkVersion(version_pt 
msgVersion,pubsub_msg_header_pt hdr){
        return check;
 }
 
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* out) {
+    pubsub_msg_serializer_map_t* map = handle;
+    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
+    unsigned int msgTypeId = 0;
+    while (hashMapIterator_hasNext(&iter)) {
+        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+        if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
+            msgTypeId = msgSer->msgId;
+            break;
+        }
+    }
+
+    if (msgTypeId == 0) {
+        printf("Cannot find msg type id for msgType %s\n", msgType);
+        return -1;
+    } else {
+        *out = msgTypeId;
+        return 0;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt 
b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 956830d..49eba87 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -52,7 +52,6 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                ${ZMQ_CRYPTO_C}
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
-               
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/dyn_msg_utils.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
        )
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 2f81bff..3c36986 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -49,7 +49,7 @@
 
 struct pubsub_admin {
 
-       pubsub_serializer_service_pt serializerSvc;
+       pubsub_serializer_service_t* serializerSvc;
 
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
@@ -89,7 +89,7 @@ celix_status_t 
pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* sco
 celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
 celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
 
 #endif /* PUBSUB_ADMIN_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index b6b76c6..158dfe7 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,14 +34,14 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt 
serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* 
serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h 
b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c6fe93a..1fbbaaf 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,7 +38,7 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt 
serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* 
serializer, char* scope, char* topic,topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
@@ -51,8 +51,8 @@ celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_pt serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_pt serializerSvc);
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 09fcd8c..5c9a5d5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -666,7 +666,7 @@ celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt 
admin, pubsub_endpoin
        return status;
 }
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
        admin->serializerSvc = serializerSvc;
 
@@ -694,7 +694,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt 
admin, pubsub_serialize
        return status;
 }
 
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
        admin->serializerSvc = NULL;
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index bb8ff56..2e95874 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -47,7 +47,6 @@
 #include "version.h"
 
 #include "pubsub_common.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 #include "publisher.h"
 
@@ -70,7 +69,7 @@ struct topic_publication {
        array_list_pt pub_ep_list; //List<pubsub_endpoint>
        hash_map_pt boundServices; //<bundle_pt,bound_service>
        celix_thread_mutex_t tp_lock;
-       pubsub_serializer_service_pt serializerSvc;
+       pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
@@ -78,26 +77,26 @@ typedef struct publish_bundle_bound_service {
        pubsub_publisher_pt service;
        bundle_pt bundle;
        char *topic;
-       hash_map_pt msgTypes;
+       pubsub_msg_serializer_map_t* map;
        unsigned short getCount;
        celix_thread_mutex_t mp_lock;
        bool mp_send_in_progress;
        array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
+} publish_bundle_bound_service_t;
 
-typedef struct pubsub_msg{
+typedef struct pubsub_msg {
        pubsub_msg_header_pt header;
        char* payload;
        int payloadSize;
-}* pubsub_msg_pt;
+} pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
 
 static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
+static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc);
 
 static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *msg, int flags);
@@ -105,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_pt serializer, char* 
bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -235,7 +234,7 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
        hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
        while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
+               publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(iter);
                pubsub_destroyPublishBundleBoundService(bound);
        }
        hashMapIterator_destroy(iter);
@@ -332,43 +331,50 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&(pub->tp_lock));
 
-       pub->serializerSvc = serializerSvc;
-
-       hash_map_iterator_pt bs_iter = 
hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(bs_iter)){
-               publish_bundle_bound_service_pt boundSvc = 
(publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-               if (hashMap_size(boundSvc->msgTypes) == 0){
-                       
pub->serializerSvc->fillMsgTypesMap(pub->serializerSvc->serializer, 
boundSvc->msgTypes, boundSvc->bundle);
+       //clearing pref serializer
+       if (pub->serializerSvc != NULL) {
+               hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+               while (hashMapIterator_hasNext(&iter)) {
+                       publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+                       bound->map = NULL;
                }
        }
-       hashMapIterator_destroy(bs_iter);
+
+       pub->serializerSvc = serializerSvc;
+       hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+       while (hashMapIterator_hasNext(&iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+               bundle_pt bundle = hashMapEntry_getKey(entry);
+               publish_bundle_bound_service_t* boundSvc = 
hashMapEntry_getValue(entry);
+               
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&boundSvc->map);
+       }
 
        celixThreadMutex_unlock(&(pub->tp_lock));
 
        return status;
 }
 
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* svc){
        celix_status_t status = CELIX_SUCCESS;
-
        celixThreadMutex_lock(&(pub->tp_lock));
 
-       hash_map_iterator_pt bs_iter = 
hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(bs_iter)){
-               publish_bundle_bound_service_pt boundSvc = 
(publish_bundle_bound_service_pt) hashMapIterator_nextValue(bs_iter);
-               
pub->serializerSvc->emptyMsgTypesMap(pub->serializerSvc->serializer, 
boundSvc->msgTypes);
+       if (pub->serializerSvc == svc) {
+               hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+               while (hashMapIterator_hasNext(&iter)) {
+                       publish_bundle_bound_service_t *boundSvc = 
hashMapIterator_nextValue(&iter);
+                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
boundSvc->map);
+                       boundSvc->map = NULL;
+               }
+               pub->serializerSvc = NULL;
        }
-       hashMapIterator_destroy(bs_iter);
-
-       pub->serializerSvc = NULL;
 
        celixThreadMutex_unlock(&(pub->tp_lock));
-
        return status;
 }
 
@@ -384,7 +390,7 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices,bundle);
        if(bound==NULL){
                bound = pubsub_createPublishBundleBoundService(publish,bundle);
                if(bound!=NULL){
@@ -410,7 +416,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices,bundle);
        if(bound!=NULL){
 
                bound->getCount--;
@@ -434,7 +440,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
        return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
+static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_t* msg, bool last){
 
        bool ret = true;
 
@@ -474,7 +480,7 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, 
array_list_pt mp_msg_parts){
        unsigned int i = 0;
        unsigned int mp_num = arrayList_size(mp_msg_parts);
        for(;i<mp_num;i++){
-               ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
+               ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_t*)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
        }
        arrayList_clear(mp_msg_parts);
 
@@ -489,10 +495,8 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
 }
 
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *msg, int flags){
-
        int status = 0;
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
+       publish_bundle_bound_service_t* bound = handle;
 
        celixThreadMutex_lock(&(bound->mp_lock));
        if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & 
PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
@@ -501,38 +505,33 @@ static int pubsub_topicPublicationSendMultipart(void 
*handle, unsigned int msgTy
                return -3;
        }
 
-       pubsub_message_type *msgType = hashMap_get(bound->msgTypes, &msgTypeId);
-
+       pubsub_msg_serializer_t* msgSer = NULL;
+       if (bound->map != NULL) {
+               msgSer = hashMap_get(bound->map->serializers, 
(void*)(uintptr_t)msgTypeId);
+       }
        int major=0, minor=0;
 
-       if (msgType != NULL && bound->parent->serializerSvc != NULL) {
-
-               version_pt msgVersion = 
bound->parent->serializerSvc->getVersion(bound->parent->serializerSvc->serializer,
 msgType);
-
+       if (msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
-
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-
                msg_hdr->type = msgTypeId;
-               if (msgVersion != NULL){
-                       version_getMajor(msgVersion, &major);
-                       version_getMinor(msgVersion, &minor);
+               if (msgSer->msgVersion != NULL){
+                       version_getMajor(msgSer->msgVersion, &major);
+                       version_getMinor(msgSer->msgVersion, &minor);
                        msg_hdr->major = major;
                        msg_hdr->minor = minor;
                }
 
-               void* serializedOutput = NULL;
-               int serializedOutputLen = 0;
-               
bound->parent->serializerSvc->serialize(bound->parent->serializerSvc->serializer,
 msgType, msg, &serializedOutput, &serializedOutputLen);
-
-               pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
+               char* serializedOutput = NULL;
+               size_t serializedOutputLen = 0;
+               msgSer->serialize(msgSer->handle, msg, &serializedOutput, 
&serializedOutputLen);
+               pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
                msg->header = msg_hdr;
                msg->payload = (char *) serializedOutput;
                msg->payloadSize = serializedOutputLen;
-
                bool snd = true;
 
-               switch(flags){
+               switch (flags) {
                case PUBSUB_PUBLISHER_FIRST_MSG:
                        bound->mp_send_in_progress = true;
                        arrayList_add(bound->mp_parts,msg);
@@ -602,9 +601,9 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 
 }
 
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
 
-       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+       publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
                bound->service = calloc(1, sizeof(*bound->service));
@@ -617,7 +616,11 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                bound->getCount = 1;
                bound->mp_send_in_progress = false;
                celixThreadMutex_create(&bound->mp_lock,NULL);
-               bound->msgTypes = hashMap_create(uintHash, NULL, uintEquals, 
NULL); //<int* (msgId),pubsub_message_type>
+
+               //TODO check if lock is needed. e.g. was the caller already 
locked?
+               if (tp->serializerSvc != NULL) {
+                       
tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, 
&bound->map);
+               }
                arrayList_create(&bound->mp_parts);
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
@@ -628,10 +631,6 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
                bound->service->send = pubsub_topicPublicationSend;
                bound->service->sendMultipart = 
pubsub_topicPublicationSendMultipart;
 
-               if (tp->serializerSvc != NULL){
-                       
tp->serializerSvc->fillMsgTypesMap(tp->serializerSvc->serializer, 
bound->msgTypes,bound->bundle);
-               }
-
        }
        else
        {
@@ -645,26 +644,24 @@ static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(to
        return bound;
 }
 
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc){
 
        celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if(boundSvc->service != NULL){
+       if (boundSvc->service != NULL) {
                free(boundSvc->service);
        }
 
-       if(boundSvc->msgTypes != NULL){
-               if (boundSvc->parent->serializerSvc != NULL){
-                       
boundSvc->parent->serializerSvc->emptyMsgTypesMap(boundSvc->parent->serializerSvc->serializer,
 boundSvc->msgTypes);
-               }
-               hashMap_destroy(boundSvc->msgTypes,false,false);
+       if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
+               
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle,
 boundSvc->map);
+               boundSvc->map = NULL;
        }
 
-       if(boundSvc->mp_parts!=NULL){
+       if (boundSvc->mp_parts!=NULL) {
                arrayList_destroy(boundSvc->mp_parts);
        }
 
-       if(boundSvc->topic!=NULL){
+       if (boundSvc->topic!=NULL) {
                free(boundSvc->topic);
        }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 3de56af..7ef2c5d 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -46,7 +46,6 @@
 
 #include "subscriber.h"
 #include "publisher.h"
-#include "dyn_msg_utils.h"
 #include "pubsub_utils.h"
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -58,8 +57,7 @@
 #define POLL_TIMEOUT   250
 #define ZMQ_POLL_TIMEOUT_MS_ENV        "ZMQ_POLL_TIMEOUT_MS"
 
-struct topic_subscription{
-
+struct topic_subscription {
        zsock_t* zmq_socket;
        zcert_t * zmq_cert;
        zcert_t * zmq_pub_cert;
@@ -71,26 +69,25 @@ struct topic_subscription{
        celix_thread_mutex_t ts_lock;
        bundle_context_pt context;
 
-       hash_map_pt servicesMap; // key = service, value = msg types map
+       hash_map_pt msgSerializers; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
        array_list_pt pendingConnections;
        array_list_pt pendingDisconnections;
 
        celix_thread_mutex_t pendingConnections_lock;
        celix_thread_mutex_t pendingDisconnections_lock;
        unsigned int nrSubscribers;
-       pubsub_serializer_service_pt serializerSvc;
-
+       pubsub_serializer_service_t* serializerSvc;
 };
 
-typedef struct complete_zmq_msg{
+typedef struct complete_zmq_msg {
        zframe_t* header;
        zframe_t* payload;
 }* complete_zmq_msg_pt;
 
-typedef struct mp_handle{
-       hash_map_pt svc_msg_db;
+typedef struct mp_handle {
+       pubsub_msg_serializer_map_t* map;
        hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
+} mp_handle_t;
 
 typedef struct msg_map_entry{
        bool retain;
@@ -104,12 +101,12 @@ static bool checkVersion(version_pt 
msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part);
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt 
svc_msg_db,array_list_pt rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, 
pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list);
+static void destroy_mp_handle(mp_handle_t* mp_handle);
 static void connectPendingPublishers(topic_subscription_pt sub);
 static void disconnectPendingPublishers(topic_subscription_pt sub);
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_pt 
serializer, char* scope, char* topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* 
serializer, char* scope, char* topic,topic_subscription_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -223,7 +220,7 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
        celixThreadMutex_create(&ts->socket_lock, NULL);
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
        arrayList_create(&ts->pendingConnections);
        arrayList_create(&ts->pendingDisconnections);
        celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -269,7 +266,7 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->servicesMap,false,false);
+       hashMap_destroy(ts->msgSerializers,false,false);
 
        celixThreadMutex_lock(&ts->pendingConnections_lock);
        arrayList_destroy(ts->pendingConnections);
@@ -429,7 +426,7 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
@@ -441,22 +438,18 @@ celix_status_t 
pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, p
        return status;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_pt serializerSvc){
+celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* svc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
-       hash_map_iterator_pt svc_iter = hashMapIterator_create(ts->servicesMap);
-       while(hashMapIterator_hasNext(svc_iter)){
-               hash_map_pt msgTypes = (hash_map_pt) 
hashMapIterator_nextValue(svc_iter);
-               if (hashMap_size(msgTypes) > 0){
-                       
ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
+       if (ts->serializerSvc == svc) {
+               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializers);
+               while(hashMapIterator_hasNext(&iter)){
+                       pubsub_msg_serializer_map_t* map = 
hashMapIterator_nextValue(&iter);
+                       
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
                }
        }
-       hashMapIterator_destroy(svc_iter);
-
        ts->serializerSvc = NULL;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -467,24 +460,19 @@ static celix_status_t topicsub_subscriberTracked(void * 
handle, service_referenc
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->servicesMap, service)) {
-               hash_map_pt msgTypes = hashMap_create(uintHash, NULL, 
uintEquals, NULL); //key = msgId, value = pubsub_message_type
-
+       if (!hashMap_containsKey(ts->msgSerializers, service)) {
                bundle_pt bundle = NULL;
                serviceReference_getBundle(reference, &bundle);
 
-               if (ts->serializerSvc != NULL){
-                       
ts->serializerSvc->fillMsgTypesMap(ts->serializerSvc->serializer, 
msgTypes,bundle);
-               }
-
-               if(hashMap_size(msgTypes)==0){ //If the msgTypes hashMap is not 
filled, the service is an unsupported subscriber
-                       hashMap_destroy(msgTypes,false,false);
-                       printf("TS: Unsupported subscriber!\n");
-               }
-               else{
-                       hashMap_put(ts->servicesMap, service, msgTypes);
+               if (ts->serializerSvc != NULL) {
+                       pubsub_msg_serializer_map_t* map = NULL;
+                       
ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
+                       if (map != NULL) {
+                               hashMap_put(ts->msgSerializers, service, map);
+                       } else {
+                               printf("TS: Cannot create msg serializer 
map\n");
+                       }
                }
-
        }
        celixThreadMutex_unlock(&ts->ts_lock);
        printf("TS: New subscriber registered.\n");
@@ -497,14 +485,9 @@ static celix_status_t topicsub_subscriberUntracked(void * 
handle, service_refere
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->servicesMap, service)) {
-               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-               if(msgTypes!=NULL){
-                       if (ts->serializerSvc != NULL){
-                               
ts->serializerSvc->emptyMsgTypesMap(ts->serializerSvc->serializer, msgTypes);
-                       }
-                       hashMap_destroy(msgTypes,false,false);
-               }
+       if (hashMap_containsKey(ts->msgSerializers, service)) {
+               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializers, service);
+        ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
        }
        celixThreadMutex_unlock(&ts->ts_lock);
 
@@ -513,66 +496,55 @@ static celix_status_t topicsub_subscriberUntracked(void * 
handle, service_refere
 }
 
 
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
+static void process_msg(topic_subscription_pt sub, array_list_pt msg_list) {
 
        pubsub_msg_header_pt first_msg_hdr = 
(pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-       while (hashMapIterator_hasNext(iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializers);
+       while (hashMapIterator_hasNext(&iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
                pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+               pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
 
-               pubsub_message_type *msgType = 
hashMap_get(msgTypes,&(first_msg_hdr->type));
-               if (msgType == NULL) {
-                       printf("TS: Primary message %d not supported. NOT 
sending any part of the whole message.\n",first_msg_hdr->type);
-               }
-               else if (sub->serializerSvc == NULL){
-                       printf("TS: No active serializer found!\n");
-               }
-               else{
+               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void*)(uintptr_t )first_msg_hdr->type);
+               if (msgSer == NULL) {
+                       printf("TS: Primary message %d not supported. NOT 
sending any part of the whole message.\n", first_msg_hdr->type);
+               } else {
                        void *msgInst = NULL;
-                       char *name = 
sub->serializerSvc->getName(sub->serializerSvc->serializer, msgType);
-                       version_pt msgVersion = 
sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-                       bool validVersion = 
checkVersion(msgVersion,first_msg_hdr);
-
+                       bool validVersion = checkVersion(msgSer->msgVersion, 
first_msg_hdr);
                        if(validVersion){
-
-                               celix_status_t status = 
sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const 
void *) zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 
&msgInst);
+                               celix_status_t status = 
msgSer->deserialize(msgSer->handle, (const 
char*)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 
0, &msgInst);
 
                                if (status == CELIX_SUCCESS) {
                                        bool release = true;
 
-                                       mp_handle_pt mp_handle = 
create_mp_handle(sub, msgTypes,msg_list);
+                                       mp_handle_t* mp_handle = 
create_mp_handle(sub, map, msg_list);
                                        pubsub_multipart_callbacks_t 
mp_callbacks;
                                        mp_callbacks.handle = mp_handle;
                                        mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
                                        mp_callbacks.getMultipart = 
pubsub_getMultipart;
-                                       subsvc->receive(subsvc->handle, name, 
first_msg_hdr->type, msgInst, &mp_callbacks, &release);
+                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
 
-                                       if(release){
-                                               
sub->serializerSvc->freeMsg(sub->serializerSvc->serializer, msgType, msgInst);
+                                       if (release) {
+                                               msgSer->freeMsg(msgSer->handle, 
msgInst);
                                        }
-                                       if(mp_handle!=NULL){
+                                       if (mp_handle!=NULL) {
                                                destroy_mp_handle(mp_handle);
                                        }
                                }
                                else{
-                                       printf("TS: Cannot deserialize msgType 
%s.\n",name);
+                                       printf("TS: Cannot deserialize msgType 
%s.\n", msgSer->msgName);
                                }
 
-                       }
-                       else{
+                       } else {
                                int major=0,minor=0;
-                               version_getMajor(msgVersion,&major);
-                               version_getMinor(msgVersion,&minor);
-                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",name,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
+                               version_getMajor(msgSer->msgVersion, &major);
+                               version_getMinor(msgSer->msgVersion, &minor);
+                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",
+                                          msgSer->msgName, major, minor, 
first_msg_hdr->major, first_msg_hdr->minor);
                        }
-
                }
        }
-       hashMapIterator_destroy(iter);
 
        int i = 0;
        for(;i<arrayList_size(msg_list);i++){
@@ -737,7 +709,7 @@ static int pubsub_getMultipart(void *handle, unsigned int 
msgTypeId, bool retain
                return -1;
        }
 
-       mp_handle_pt mp_handle = (mp_handle_pt)handle;
+       mp_handle_t* mp_handle = handle;
        msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map,&msgTypeId);
        if(entry!=NULL){
                entry->retain = retain;
@@ -753,63 +725,55 @@ static int pubsub_getMultipart(void *handle, unsigned int 
msgTypeId, bool retain
 
 }
 
-static mp_handle_pt create_mp_handle(topic_subscription_pt sub, hash_map_pt 
svc_msg_db,array_list_pt rcv_msg_list){
+static mp_handle_t* create_mp_handle(topic_subscription_pt sub, 
pubsub_msg_serializer_map_t* map, array_list_pt rcv_msg_list) {
 
        if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart 
message
                return NULL;
        }
 
-       mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
-       mp_handle->svc_msg_db = svc_msg_db;
-       mp_handle->rcv_msg_map = hashMap_create(uintHash, NULL, uintEquals, 
NULL);
-
-       int i=1; //We skip the first message, it will be handle differently
-       for(;i<arrayList_size(rcv_msg_list);i++){
-               complete_zmq_msg_pt c_msg = 
(complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
+       mp_handle_t* mp_handle = calloc(1,sizeof(struct mp_handle));
+       mp_handle->map = map;
+       mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
 
+       int i; //We skip the first message, it will be handle differently
+       for (i=1 ; i<arrayList_size(rcv_msg_list) ; i++) {
+               complete_zmq_msg_pt c_msg = arrayList_get(rcv_msg_list,i);
                pubsub_msg_header_pt header = 
(pubsub_msg_header_pt)zframe_data(c_msg->header);
 
-               pubsub_message_type *msgType = 
hashMap_get(svc_msg_db,&(header->type));
-               if (msgType != NULL && sub->serializerSvc != NULL) {
+               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void*)(uintptr_t)(header->type));
+               if (msgSer != NULL) {
                        void *msgInst = NULL;
-                       version_pt msgVersion = 
sub->serializerSvc->getVersion(sub->serializerSvc->serializer, msgType);
-
-                       bool validVersion = checkVersion(msgVersion,header);
-
+                       bool validVersion = checkVersion(msgSer->msgVersion, 
header);
                        if(validVersion){
-                               celix_status_t status = 
sub->serializerSvc->deserialize(sub->serializerSvc->serializer, msgType, (const 
void *) zframe_data(c_msg->payload), &msgInst);
+                               //TODO make the getMultipart lazy?
+                               celix_status_t status = 
msgSer->deserialize(msgSer->handle, (const char*)zframe_data(c_msg->payload), 
0, &msgInst);
 
                                if(status == CELIX_SUCCESS){
-                                       unsigned int* msgId = 
calloc(1,sizeof(unsigned int));
-                                       *msgId = header->type;
                                        msg_map_entry_pt entry = 
calloc(1,sizeof(struct msg_map_entry));
                                        entry->msgInst = msgInst;
-                                       
hashMap_put(mp_handle->rcv_msg_map,msgId,entry);
+                                       hashMap_put(mp_handle->rcv_msg_map, 
(void*)(uintptr_t)(header->type), entry);
                                }
                        }
                }
-
        }
-
        return mp_handle;
-
 }
 
-static void destroy_mp_handle(mp_handle_pt mp_handle){
+static void destroy_mp_handle(mp_handle_t* mp_handle){
 
        hash_map_iterator_pt iter = 
hashMapIterator_create(mp_handle->rcv_msg_map);
        while(hashMapIterator_hasNext(iter)){
                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               unsigned int* msgId = (unsigned int*)hashMapEntry_getKey(entry);
+               unsigned int msgId = (unsigned 
int)(uintptr_t)hashMapEntry_getKey(entry);
                msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-               pubsub_message_type* msgType = 
hashMap_get(mp_handle->svc_msg_db,msgId);
-               if(msgType!=NULL){
-                       if(!msgEntry->retain){
-                               free(msgEntry->msgInst);
+               pubsub_msg_serializer_t* msgSer = 
hashMap_get(mp_handle->map->serializers, (void*)(uintptr_t)msgId);
+               if (msgSer != NULL) {
+                       if (!msgEntry->retain) {
+                               msgSer->freeMsg(msgSer->handle, 
msgEntry->msgInst);
                        }
                }
                else{
-                       printf("TS: ERROR: Cannot find pubsub_message_type for 
msg %u, so cannot destroy it!\n",*msgId);
+                       printf("TS: ERROR: Cannot find pubsub_message_type for 
msg %u, so cannot destroy it!\n", msgId);
                }
        }
        hashMapIterator_destroy(iter);

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/dyn_msg_utils.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/dyn_msg_utils.h 
b/pubsub/pubsub_common/public/include/dyn_msg_utils.h
deleted file mode 100644
index 71085ab..0000000
--- a/pubsub/pubsub_common/public/include/dyn_msg_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * dyn_msg_utils.h
- *
- *  \date       Nov 11, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef DYN_MSG_UTILS_H_
-#define DYN_MSG_UTILS_H_
-
-#include "bundle.h"
-#include "hash_map.h"
-
-unsigned int uintHash(const void * uintNum);
-int uintEquals(const void * uintNum, const void * toCompare);
-
-void fillMsgTypesMap(hash_map_pt msgTypesMap,bundle_pt bundle);
-void emptyMsgTypesMap(hash_map_pt msgTypesMap);
-
-#endif /* DYN_MSG_UTILS_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_admin.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_admin.h 
b/pubsub/pubsub_common/public/include/pubsub_admin.h
index 52cb75c..f7ab7e0 100644
--- a/pubsub/pubsub_common/public/include/pubsub_admin.h
+++ b/pubsub/pubsub_common/public/include/pubsub_admin.h
@@ -56,8 +56,8 @@ struct pubsub_admin_service {
        celix_status_t (*matchPublisher)(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
        celix_status_t (*matchSubscriber)(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
 
-       celix_status_t (*setSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
-       celix_status_t (*removeSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_pt serializerSvc);
+       celix_status_t (*setSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
+       celix_status_t (*removeSerializer)(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
 
 };
 

http://git-wip-us.apache.org/repos/asf/celix/blob/7efe4331/pubsub/pubsub_common/public/include/pubsub_serializer.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_common/public/include/pubsub_serializer.h 
b/pubsub/pubsub_common/public/include/pubsub_serializer.h
index f2df075..e9f9f6c 100644
--- a/pubsub/pubsub_common/public/include/pubsub_serializer.h
+++ b/pubsub/pubsub_common/public/include/pubsub_serializer.h
@@ -24,33 +24,44 @@
  *  \copyright Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_SERIALIZER_H_
-#define PUBSUB_SERIALIZER_H_
+#ifndef PUBSUB_SERIALIZER_SERVICE_H_
+#define PUBSUB_SERIALIZER_SERVICE_H_
 
 #include "service_reference.h"
 
 #include "pubsub_common.h"
 
-typedef struct _pubsub_message_type pubsub_message_type;
-
-typedef struct pubsub_serializer *pubsub_serializer_pt;
-
-struct pubsub_serializer_service {
+/**
+ * There should be a pubsub_serializer_t
+ * per msg type (msg id) per bundle
+ *
+ * The pubsub_serializer_service can create
+ * a serializer_map per bundle. Potentially using
+ * the extender pattern.
+ */
+typedef struct pubsub_msg_serializer {
+    void* handle;
+    unsigned int msgId;
+    const char* msgName;
+    version_pt msgVersion;
 
-       pubsub_serializer_pt serializer;
+    celix_status_t (*serialize)(void* handle, const void* input, char** out, 
size_t* outLen);
+    celix_status_t (*deserialize)(void* handle, const char* input, size_t 
inputLen, void** out); //note inputLen can be 0 if predefined size is not needed
 
-       celix_status_t (*serialize)(pubsub_serializer_pt serializer, 
pubsub_message_type *msgType, const void *input, void **output, int *outputLen);
-       celix_status_t (*deserialize)(pubsub_serializer_pt serializer, 
pubsub_message_type *msgType, const void *input, void **output);
+    void (*freeMsg)(void* handle, void* msg);
+} pubsub_msg_serializer_t;
 
-       void (*fillMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt 
msgTypesMap,bundle_pt bundle);
-       void (*emptyMsgTypesMap)(pubsub_serializer_pt serializer, hash_map_pt 
msgTypesMap);
+typedef struct pubsub_msg_serializer_map {
+    bundle_pt bundle;
+    hash_map_pt serializers; //key = msg id (unsigned int), value = 
pubsub_serializer_t*
+} pubsub_msg_serializer_map_t;
 
-       version_pt (*getVersion)(pubsub_serializer_pt serializer, 
pubsub_message_type *msgType);
-       char* (*getName)(pubsub_serializer_pt serializer, pubsub_message_type 
*msgType);
-       void (*freeMsg)(pubsub_serializer_pt serializer, pubsub_message_type 
*msgType, void *msg);
+typedef struct pubsub_serializer_service {
+       void* handle;
 
-};
+       celix_status_t (*createSerializerMap)(void* handle, bundle_pt bundle, 
pubsub_msg_serializer_map_t** out);
+    celix_status_t (*destroySerializerMap)(void* handle, 
pubsub_msg_serializer_map_t* map);
 
-typedef struct pubsub_serializer_service *pubsub_serializer_service_pt;
+} pubsub_serializer_service_t;
 
-#endif /* PUBSUB_SERIALIZER_H_ */
+#endif /* PUBSUB_SERIALIZER_SERVICE_H_ */

Reply via email to