Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-408_runtime 7efe4331a -> 2d9c77d53


CELIX-408: Refactoring in the pubsub serializer usage. The topology manger does 
not have to started before the pubsub admins anymore

Known issues:
 - The serializer how to be started before the admins, to prevent issue when 
using a admin without serializer. Note admin should only register itself if a 
serializer is available.
 - Still considerd instable


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/2d9c77d5
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/2d9c77d5
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/2d9c77d5

Branch: refs/heads/feature/CELIX-408_runtime
Commit: 2d9c77d530184d5d3d119cd3f92202e332996c78
Parents: 7efe433
Author: Pepijn Noltes <pepijnnol...@gmail.com>
Authored: Wed Apr 12 13:27:09 2017 +0200
Committer: Pepijn Noltes <pepijnnol...@gmail.com>
Committed: Wed Apr 12 13:27:09 2017 +0200

----------------------------------------------------------------------
 pubsub/deploy/CMakeLists.txt                    |  50 ++++----
 .../include/pubsub_publish_service_private.h    |   4 +-
 .../private/src/pubsub_admin_impl.c             |   5 +-
 .../private/src/topic_publication.c             | 104 +++++++++-------
 .../private/src/topic_subscription.c            |  57 ++++++---
 .../include/pubsub_publish_service_private.h    |   4 +-
 .../private/include/topic_subscription.h        |   2 +-
 .../private/src/pubsub_admin_impl.c             |   7 +-
 .../private/src/topic_publication.c             | 121 +++++++++++--------
 .../private/src/topic_subscription.c            | 102 ++++++++++------
 .../private/src/pubsub_serializer_impl.c        |   9 +-
 .../private/src/pubsub_topology_manager.c       |  11 +-
 pubsub/test/CMakeLists.txt                      |  14 +--
 pubsub/test/msg_descriptors/msg.descriptor      |   3 +-
 pubsub/test/test/msg.h                          |   4 +-
 pubsub/test/test/sut_activator.c                |   5 +-
 pubsub/test/test/tst_activator.cpp              |  13 +-
 17 files changed, 322 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/deploy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/deploy/CMakeLists.txt b/pubsub/deploy/CMakeLists.txt
index 52fecc9..85b49d2 100644
--- a/pubsub/deploy/CMakeLists.txt
+++ b/pubsub/deploy/CMakeLists.txt
@@ -24,12 +24,12 @@ add_deploy("pubsub_publisher_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+               org.apache.celix.pubsub_serializer.PubSubSerializerJson
+               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_publisher.PoiPublisher
        org.apache.celix.pubsub_publisher.PoiPublisher2
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber_udp_mc" 
@@ -37,11 +37,11 @@ add_deploy("pubsub_subscriber_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+               org.apache.celix.pubsub_serializer.PubSubSerializerJson
+               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 add_deploy("pubsub_subscriber2_udp_mc" 
@@ -49,11 +49,11 @@ add_deploy("pubsub_subscriber2_udp_mc"
     BUNDLES
        shell
        shell_tui
-       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+               org.apache.celix.pubsub_serializer.PubSubSerializerJson
+               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
        org.apache.celix.pubsub_subscriber.PoiSubscriber
-       org.apache.celix.pubsub_serializer.PubSubSerializerJson
 )
 
 if (ETCD_CMD AND XTERM_CMD)
@@ -81,13 +81,13 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
-              org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_admin.PubSubAdminUdpMc
               org.apache.celix.pubsub_publisher.PoiPublisher
               org.apache.celix.pubsub_publisher.PoiPublisher2
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
            PROPERTIES
               poi1.psa=zmq
               poi2.psa=udp
@@ -98,12 +98,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
-              org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_admin.PubSubAdminUdpMc
               org.apache.celix.pubsub_subscriber.PoiSubscriber
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
            PROPERTIES
               poi1.psa=zmq
               poi2.psa=udp
@@ -115,12 +115,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
-              org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_publisher.PoiPublisher
               org.apache.celix.pubsub_subscriber.PoiSubscriber
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
        )
 
        add_deploy("pubsub_publisher_zmq"
@@ -128,12 +128,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
-              org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_publisher.PoiPublisher
               org.apache.celix.pubsub_publisher.PoiPublisher2
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
                PROPERTIES
                    pubsub.scope=my_small_scope
        )
@@ -143,11 +143,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_subscriber.PoiSubscriber
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
        )
 
        add_deploy("pubsub_subscriber2_zmq"
@@ -155,11 +155,12 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_subscriber.PoiSubscriber
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
+
        )
 
        # ZMQ Multipart
@@ -168,23 +169,23 @@ if (BUILD_PUBSUB_PSA_ZMQ)
            BUNDLES
               shell
               shell_tui
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
               org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
               org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
               org.apache.celix.pubsub_admin.PubSubAdminZmq
               org.apache.celix.pubsub_subscriber.MpSubscriber
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
        )
 
        add_deploy("pubsub_mp_publisher_zmq"
            GROUP "pubsub"
            BUNDLES
-              shell
-              shell_tui
-              org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
-              org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-              org.apache.celix.pubsub_admin.PubSubAdminZmq
-              org.apache.celix.pubsub_publisher.MpPublisher
-              org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       shell
+                       shell_tui
+                       org.apache.celix.pubsub_serializer.PubSubSerializerJson
+                       org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+                       
org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+                       org.apache.celix.pubsub_admin.PubSubAdminZmq
+                       org.apache.celix.pubsub_publisher.MpPublisher
        )
 
        if (ETCD_CMD AND XTERM_CMD)
@@ -207,6 +208,7 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                        GROUP pubsub
                        DEPLOYMENTS
                                pubsub_publisher
+                               pubsub_subscriber_zmq
                                pubsub_subscriber2_zmq
                        COMMANDS
                                etcd

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
index 57d942a..b43fb08 100644
--- 
a/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
+++ 
b/pubsub/pubsub_admin_udp_mc/private/include/pubsub_publish_service_private.h
@@ -42,13 +42,13 @@ typedef struct pubsub_udp_msg {
 } pubsub_udp_msg_t;
 
 typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
index ebfe3e6..3693970 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/pubsub_admin_impl.c
@@ -431,7 +431,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_
 
                if (factory == NULL) {
                        topic_publication_pt pub = NULL;
-                       status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, admin->serializerSvc, 
admin->mcIpAddress,&pub);
+                       status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, 
admin->mcIpAddress,&pub);
+                       pubsub_topicPublicationSetSerializer(pub, 
admin->serializerSvc);
                        if(status == CELIX_SUCCESS){
                                status = 
pubsub_topicPublicationStart(admin->bundle_context,pub,&factory);
                                if(status==CELIX_SUCCESS && factory !=NULL){
@@ -655,7 +656,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt 
admin, pubsub_serialize
        while(hashMapIterator_hasNext(lp_iter)){
                service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
                topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationAddSerializer(topic_pub, 
admin->serializerSvc);
+               pubsub_topicPublicationSetSerializer(topic_pub, 
admin->serializerSvc);
        }
        hashMapIterator_destroy(lp_iter);
        celixThreadMutex_unlock(&admin->localPublicationsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index be0a433..227761b 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -63,7 +63,7 @@ struct topic_publication {
 
 typedef struct publish_bundle_bound_service {
        topic_publication_pt parent;
-       pubsub_publisher_pt service;
+       pubsub_publisher_t pubSvc;
        bundle_pt bundle;
     char *scope;
        char *topic;
@@ -97,7 +97,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){
 
     char* ep = malloc(EP_ADDRESS_LEN);
     memset(ep,0,EP_ADDRESS_LEN);
@@ -116,7 +116,7 @@ celix_status_t pubsub_topicPublicationCreate(int 
sendSocket, pubsub_endpoint_pt
        pub->destAddr.sin_family = AF_INET;
        pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
        pub->destAddr.sin_port = htons(port);
-       pub->serializerSvc = serializer;
+       pub->serializerSvc = NULL;
 
        pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 
@@ -222,30 +222,36 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&(pub->tp_lock));
 
     //clear old serializer
     if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices); //key = bundle , value = svc
+        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices); //key = bundle_pt, 
publish_bundle_bound_service_t*
         while (hashMapIterator_hasNext(&iter)) {
             publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
                        
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
                        bound->map = NULL;
         }
     }
 
     //setup new serializer
        pub->serializerSvc = serializerSvc;
-       hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-       while (hashMapIterator_hasNext(&iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-               bundle_pt bundle = hashMapEntry_getKey(entry);
-               publish_bundle_bound_service_t* bound = 
hashMapEntry_getValue(entry);
-               
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
-       }
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            bundle_pt bundle = hashMapEntry_getKey(entry);
+            publish_bundle_bound_service_t *bound = 
hashMapEntry_getValue(entry);
+            celixThreadMutex_lock(&bound->mp_lock);
+            
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+        }
+    }
 
        celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -260,7 +266,9 @@ celix_status_t 
pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
         hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
         while (hashMapIterator_hasNext(&iter)) {
             publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
             
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
                        bound->map = NULL;
         }
     }
@@ -294,7 +302,7 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
        }
 
        if (bound != NULL) {
-               *service = bound->service;
+               *service = &bound->pubSvc;
        }
 
        celixThreadMutex_unlock(&(publish->tp_lock));
@@ -374,13 +382,21 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
 
     if (bound->map == NULL) {
         printf("TP: Serializer is not set!\n");
+        status = 1;
     } else if (msgSer == NULL ){
         printf("TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
+        printf("Note supported messages:\n");
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+        }
+        status = 1;
     }
 
     int major=0, minor=0;
 
-    if (msgSer != NULL) {
+    if (status == 0 && msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
                msg_hdr->type = msgTypeId;
@@ -407,9 +423,6 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
                free(msg);
                free(serializedOutput);
 
-    } else {
-        printf("TP: Message %u not supported.\n",msgTypeId);
-        status=-1;
     }
 
     celixThreadMutex_unlock(&(bound->mp_lock));
@@ -418,9 +431,30 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
     return status;
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* out){
+    publish_bundle_bound_service_t* bound = handle;
+    unsigned int msgTypeId = 0;
+
+    celixThreadMutex_lock(&bound->mp_lock);
+    if (bound->map != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
+            if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+                msgTypeId = msgSer->msgId;
+                break;
+            }
+        }
+    }
+    celixThreadMutex_unlock(&bound->mp_lock);
+
+    if (msgTypeId != 0) {
+        *out = msgTypeId;
+        return 0;
+    } else {
+        printf("TP: Cannot find msg type id for msg type %s\n", msgType);
+        return 1;
+    }
 }
 
 
@@ -432,15 +466,10 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 }
 
 static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle) {
-
-       publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+    //PRECOND lock on publish->tp_lock
+    publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
-               bound->service = calloc(1, sizeof(*bound->service));
-       }
-
-       if (bound != NULL && bound->service != NULL) {
-
                bound->parent = tp;
                bound->bundle = bundle;
                bound->getCount = 1;
@@ -452,21 +481,17 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
                bound->scope=strdup(pubEP->scope);
                bound->topic=strdup(pubEP->topic);
                bound->largeUdpHandle = largeUdp_create(1);
-               bound->service->handle = bound;
-               bound->service->localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->service->send = pubsub_topicPublicationSend;
-               bound->service->sendMultipart = NULL;  //Multipart not 
supported (jet) for UDP
+               bound->pubSvc.handle = bound;
+               bound->pubSvc.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
+               bound->pubSvc.send = pubsub_topicPublicationSend;
+               bound->pubSvc.sendMultipart = NULL;  //Multipart not supported 
(jet) for UDP
 
-        //TODO check if lock on tp is needed? (e.g. is lock already done by 
caller?)
-               if (tp->serializerSvc != NULL) {
+       if (tp->serializerSvc != NULL) {
             tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, 
bundle, &bound->map);
                }
        }
        else
        {
-               if (bound != NULL) {
-                       free(bound->service);
-               }
                free(bound);
                return NULL;
        }
@@ -475,14 +500,9 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
 }
 
 static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc) {
-
+    //PRECOND lock on publish->tp_lock
        celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if (boundSvc->service != NULL) {
-               free(boundSvc->service);
-       }
-
-    //TODO check if lock on parent is needed, e.g. does the caller already 
lock?
     if (boundSvc->map != NULL) {
         if (boundSvc->parent->serializerSvc == NULL) {
             printf("TP: Cannot destroy pubsub msg serializer map. No serliazer 
service\n");

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index da23b21..a424112 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -29,7 +29,6 @@
 #include <unistd.h>
 #include <signal.h>
 
-#include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
@@ -57,7 +56,7 @@
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
-struct topic_subscription{
+struct topic_subscription {
 
        char* ifIpAddress;
        service_tracker_pt tracker;
@@ -70,7 +69,8 @@ struct topic_subscription{
 
     //NOTE. using a service ptr can be dangerous, because pointer can be 
reused.
     //ensuring that pointer are removed before new (refurbish) pionter comes 
along is crucial!
-       hash_map_pt msgSerializersMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
+       hash_map_pt msgSerializerMapMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
+    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
 
        hash_map_pt socketMap; // key = URL, value = listen-socket
        unsigned int nrSubscribers;
@@ -118,7 +118,8 @@ celix_status_t pubsub_topicSubscriptionCreate(char* 
ifIp,bundle_context_pt bundl
 
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->msgSerializersMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
        ts->socketMap =  hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
        ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
@@ -164,7 +165,8 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->msgSerializersMap,false,false);
+       hashMap_destroy(ts->msgSerializerMapMap, false, false);
+    hashMap_destroy(ts->bundleMap, false, false);
 
        hashMap_destroy(ts->socketMap,false,false);
        largeUdp_destroy(ts->largeUdpHandle);
@@ -394,13 +396,34 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
+    //clear old
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
 
+        }
+    }
        ts->serializerSvc = serializerSvc;
-
+    //init new
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+        }
+    }
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -411,10 +434,13 @@ celix_status_t 
pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
 
        celixThreadMutex_lock(&ts->ts_lock);
        if (ts->serializerSvc == serializerSvc) { //only act if svc removed is 
services used
-               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializersMap);
+               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
                while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_map_t* map = 
hashMapIterator_nextValue(&iter);
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
             ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
                }
                ts->serializerSvc = NULL;
        }
@@ -428,7 +454,7 @@ static celix_status_t topicsub_subscriberTracked(void * 
handle, service_referenc
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->msgSerializersMap, svc)) {
+       if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
                bundle_pt bundle = NULL;
                serviceReference_getBundle(reference, &bundle);
 
@@ -436,7 +462,8 @@ static celix_status_t topicsub_subscriberTracked(void * 
handle, service_referenc
             pubsub_msg_serializer_map_t* map = NULL;
             ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
             if (map != NULL) {
-                hashMap_put(ts->msgSerializersMap, svc, map);
+                hashMap_put(ts->msgSerializerMapMap, svc, map);
+                hashMap_put(ts->bundleMap, svc, bundle);
             }
                }
        }
@@ -452,10 +479,12 @@ static celix_status_t topicsub_subscriberUntracked(void * 
handle, service_refere
 
 
     celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->msgSerializersMap, svc)) {
-               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializersMap, svc);
+       if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializerMapMap, svc);
                if (ts->serializerSvc != NULL){
                        
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
+            hashMap_remove(ts->bundleMap, svc);
+            hashMap_remove(ts->msgSerializerMapMap, svc);
                }
        }
        celixThreadMutex_unlock(&ts->ts_lock);
@@ -467,7 +496,7 @@ static celix_status_t topicsub_subscriberUntracked(void * 
handle, service_refere
 
 static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
 
-       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializersMap);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializerMapMap);
        celixThreadMutex_lock(&sub->ts_lock);
        while (hashMapIterator_hasNext(&iter)) {
                hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
index 158dfe7..dbd2ff1 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
@@ -34,13 +34,13 @@
 
 typedef struct topic_publication *topic_publication_pt;
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* 
serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, 
unsigned int maxPort, topic_publication_pt *out);
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h 
b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index 1fbbaaf..c1e78c3 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -51,7 +51,7 @@ celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
 celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc);
 
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
index 5c9a5d5..6095d8a 100644
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
@@ -444,7 +444,8 @@ celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin, pubsub_endpoint
 
         if (factory == NULL) {
             topic_publication_pt pub = NULL;
-            status = pubsub_topicPublicationCreate(admin->bundle_context, 
pubEP, admin->serializerSvc, admin->ipAddress, admin->basePort, admin->maxPort, 
&pub);
+            status = pubsub_topicPublicationCreate(admin->bundle_context, 
pubEP, admin->ipAddress, admin->basePort, admin->maxPort, &pub);
+                       pubsub_topicPublicationSetSerializer(pub, 
admin->serializerSvc);
             if (status == CELIX_SUCCESS) {
                 status = pubsub_topicPublicationStart(admin->bundle_context, 
pub, &factory);
                 if (status == CELIX_SUCCESS && factory != NULL) {
@@ -676,7 +677,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt 
admin, pubsub_serialize
        while(hashMapIterator_hasNext(lp_iter)){
                service_factory_pt factory = (service_factory_pt) 
hashMapIterator_nextValue(lp_iter);
                topic_publication_pt topic_pub = (topic_publication_pt) 
factory->handle;
-               pubsub_topicPublicationAddSerializer(topic_pub, 
admin->serializerSvc);
+               pubsub_topicPublicationSetSerializer(topic_pub, 
admin->serializerSvc);
        }
        hashMapIterator_destroy(lp_iter);
        celixThreadMutex_unlock(&admin->localPublicationsLock);
@@ -686,7 +687,7 @@ celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt 
admin, pubsub_serialize
        hash_map_iterator_pt subs_iter = 
hashMapIterator_create(admin->subscriptions);
        while(hashMapIterator_hasNext(subs_iter)){
                topic_subscription_pt topic_sub = (topic_subscription_pt) 
hashMapIterator_nextValue(subs_iter);
-               pubsub_topicSubscriptionAddSerializer(topic_sub, 
admin->serializerSvc);
+               pubsub_topicSubscriptionSetSerializer(topic_sub, 
admin->serializerSvc);
        }
        hashMapIterator_destroy(subs_iter);
        celixThreadMutex_unlock(&admin->subscriptionsLock);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
index 2e95874..28bf56e 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
@@ -74,7 +74,7 @@ struct topic_publication {
 
 typedef struct publish_bundle_bound_service {
        topic_publication_pt parent;
-       pubsub_publisher_pt service;
+       pubsub_publisher_t pubSvc;
        bundle_pt bundle;
        char *topic;
        pubsub_msg_serializer_map_t* map;
@@ -104,7 +104,7 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 
 static void delay_first_send_for_late_joiners(void);
 
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t* serializer, char* 
bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, unsigned int 
maxPort, topic_publication_pt *out){
        celix_status_t status = CELIX_SUCCESS;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
@@ -209,7 +209,7 @@ celix_status_t 
pubsub_topicPublicationCreate(bundle_context_pt bundle_context, p
 
        pub->endpoint = ep;
        pub->zmq_socket = socket;
-       pub->serializerSvc = serializer;
+       pub->serializerSvc = NULL;
 
 #ifdef BUILD_WITH_ZMQ_SECURITY
        if (pubEP->is_secure){
@@ -298,9 +298,7 @@ celix_status_t 
pubsub_topicPublicationStop(topic_publication_pt pub){
        celix_status_t status = CELIX_SUCCESS;
 
        //celixThreadMutex_lock(&(pub->tp_lock));
-
        status = serviceRegistration_unregister(pub->svcFactoryReg);
-
        //celixThreadMutex_unlock(&(pub->tp_lock));
 
        return status;
@@ -331,7 +329,7 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationAddSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&(pub->tp_lock));
@@ -341,19 +339,25 @@ celix_status_t 
pubsub_topicPublicationAddSerializer(topic_publication_pt pub, pu
                hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
                while (hashMapIterator_hasNext(&iter)) {
                        publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
-                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+            celixThreadMutex_lock(&bound->mp_lock);
+            
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
                        bound->map = NULL;
                }
        }
 
        pub->serializerSvc = serializerSvc;
-       hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-       while (hashMapIterator_hasNext(&iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-               bundle_pt bundle = hashMapEntry_getKey(entry);
-               publish_bundle_bound_service_t* boundSvc = 
hashMapEntry_getValue(entry);
-               
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&boundSvc->map);
-       }
+    if (pub->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            bundle_pt bundle = hashMapEntry_getKey(entry);
+            publish_bundle_bound_service_t* bound = 
hashMapEntry_getValue(entry);
+            celixThreadMutex_lock(&bound->mp_lock);
+            
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+        }
+    }
 
        celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -367,9 +371,11 @@ celix_status_t 
pubsub_topicPublicationRemoveSerializer(topic_publication_pt pub,
        if (pub->serializerSvc == svc) {
                hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
                while (hashMapIterator_hasNext(&iter)) {
-                       publish_bundle_bound_service_t *boundSvc = 
hashMapIterator_nextValue(&iter);
-                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
boundSvc->map);
-                       boundSvc->map = NULL;
+                       publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
+            celixThreadMutex_lock(&bound->mp_lock);
+                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
+            celixThreadMutex_unlock(&bound->mp_lock);
+                       bound->map = NULL;
                }
                pub->serializerSvc = NULL;
        }
@@ -402,7 +408,7 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
        }
 
        if(bound!=NULL){
-               *service = bound->service;
+               *service = &bound->pubSvc;
        }
 
        celixThreadMutex_unlock(&(publish->tp_lock));
@@ -489,29 +495,41 @@ static bool send_pubsub_mp_msg(zsock_t* zmq_socket, 
array_list_pt mp_msg_parts){
 }
 
 static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
-
        return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, 
PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
 }
 
 static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *msg, int flags){
        int status = 0;
        publish_bundle_bound_service_t* bound = handle;
-
        celixThreadMutex_lock(&(bound->mp_lock));
+
        if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & 
PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
                printf("TP: Multipart send already in progress. Cannot process 
a new one.\n");
                celixThreadMutex_unlock(&(bound->mp_lock));
                return -3;
-       }
+    }
 
        pubsub_msg_serializer_t* msgSer = NULL;
        if (bound->map != NULL) {
                msgSer = hashMap_get(bound->map->serializers, 
(void*)(uintptr_t)msgTypeId);
        }
-       int major=0, minor=0;
 
-       if (msgSer != NULL) {
+    if (bound->map == NULL) {
+        printf("TP: Serializer is not set!\n");
+        status = 1;
+    } else if (msgSer == NULL ){
+        printf("TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
+        printf("Note supported messages:\n");
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
+            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
+        }
+        status = 1;
+    }
+
+       int major=0, minor=0;
+       if (status == 0 && msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
                msg_hdr->type = msgTypeId;
@@ -579,19 +597,38 @@ static int pubsub_topicPublicationSendMultipart(void 
*handle, unsigned int msgTy
                }
 
        } else {
-               printf("TP: Message %u not supported.",msgTypeId);
+               printf("TP: Message %u not supported.\n",msgTypeId);
                status=-1;
        }
 
        celixThreadMutex_unlock(&(bound->mp_lock));
-
        return status;
-
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* out){
+       publish_bundle_bound_service_t* bound = handle;
+       unsigned int msgTypeId = 0;
+
+    celixThreadMutex_lock(&bound->mp_lock);
+       if (bound->map != NULL) {
+               hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
+               while (hashMapIterator_hasNext(&iter)) {
+                       pubsub_msg_serializer_t* msgSer = 
hashMapIterator_nextValue(&iter);
+                       if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
+                               msgTypeId = msgSer->msgId;
+                               break;
+                       }
+               }
+       }
+    celixThreadMutex_unlock(&bound->mp_lock);
+
+       if (msgTypeId != 0) {
+               *out = msgTypeId;
+               return 0;
+       } else {
+               printf("TP: Cannot find msg type id for msg type %s\n", 
msgType);
+               return 1;
+       }
 }
 
 static unsigned int rand_range(unsigned int min, unsigned int max){
@@ -602,14 +639,11 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 }
 
 static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+       //PRECOND lock on tp->lock
 
        publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
-               bound->service = calloc(1, sizeof(*bound->service));
-       }
-
-       if (bound != NULL && bound->service != NULL) {
 
                bound->parent = tp;
                bound->bundle = bundle;
@@ -617,7 +651,6 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
                bound->mp_send_in_progress = false;
                celixThreadMutex_create(&bound->mp_lock,NULL);
 
-               //TODO check if lock is needed. e.g. was the caller already 
locked?
                if (tp->serializerSvc != NULL) {
                        
tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, bundle, 
&bound->map);
                }
@@ -626,17 +659,13 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
                bound->topic=strdup(pubEP->topic);
 
-               bound->service->handle = bound;
-               bound->service->localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->service->send = pubsub_topicPublicationSend;
-               bound->service->sendMultipart = 
pubsub_topicPublicationSendMultipart;
-
+               bound->pubSvc.handle = bound;
+               bound->pubSvc.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
+               bound->pubSvc.send = pubsub_topicPublicationSend;
+               bound->pubSvc.sendMultipart = 
pubsub_topicPublicationSendMultipart;
        }
        else
        {
-               if (bound != NULL) {
-                       free(bound->service);
-               }
                free(bound);
                return NULL;
        }
@@ -645,13 +674,9 @@ static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(to
 }
 
 static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc){
-
+       //PRECOND lock on publish->tp_lock
        celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if (boundSvc->service != NULL) {
-               free(boundSvc->service);
-       }
-
        if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
                
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle,
 boundSvc->map);
                boundSvc->map = NULL;

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
index 7ef2c5d..537fbe5 100644
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
@@ -69,7 +69,8 @@ struct topic_subscription {
        celix_thread_mutex_t ts_lock;
        bundle_context_pt context;
 
-       hash_map_pt msgSerializers; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
+       hash_map_pt msgSerializerMapMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
+    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
        array_list_pt pendingConnections;
        array_list_pt pendingDisconnections;
 
@@ -220,7 +221,8 @@ celix_status_t 
pubsub_topicSubscriptionCreate(bundle_context_pt bundle_context,
        celixThreadMutex_create(&ts->socket_lock, NULL);
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->msgSerializers = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
+    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
        arrayList_create(&ts->pendingConnections);
        arrayList_create(&ts->pendingDisconnections);
        celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
@@ -266,7 +268,8 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->msgSerializers,false,false);
+       hashMap_destroy(ts->msgSerializerMapMap,false,false);
+    hashMap_destroy(ts->bundleMap,false,false);
 
        celixThreadMutex_lock(&ts->pendingConnections_lock);
        arrayList_destroy(ts->pendingConnections);
@@ -426,13 +429,34 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionAddSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
+celix_status_t pubsub_topicSubscriptionSetserializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc){
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
-       ts->serializerSvc = serializerSvc;
-
+    //clear old
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+
+        }
+    }
+    ts->serializerSvc = serializerSvc;
+    //init new
+    if (ts->serializerSvc != NULL) {
+        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
+            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
+        }
+    }
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -442,53 +466,59 @@ celix_status_t 
pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt ts
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (ts->serializerSvc == svc) {
-               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializers);
-               while(hashMapIterator_hasNext(&iter)){
-                       pubsub_msg_serializer_map_t* map = 
hashMapIterator_nextValue(&iter);
-                       
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-               }
-       }
-       ts->serializerSvc = NULL;
+    if (ts->serializerSvc == svc) { //only act if svc removed is services used
+        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
+        while (hashMapIterator_hasNext(&iter)) {
+            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
+            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
+            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
+        }
+        ts->serializerSvc = NULL;
+    }
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
 }
 
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void* svc) {
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->msgSerializers, service)) {
-               bundle_pt bundle = NULL;
-               serviceReference_getBundle(reference, &bundle);
-
-               if (ts->serializerSvc != NULL) {
-                       pubsub_msg_serializer_map_t* map = NULL;
-                       
ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, bundle, &map);
-                       if (map != NULL) {
-                               hashMap_put(ts->msgSerializers, service, map);
-                       } else {
-                               printf("TS: Cannot create msg serializer 
map\n");
-                       }
-               }
-       }
+    if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+        bundle_pt bundle = NULL;
+        serviceReference_getBundle(reference, &bundle);
+
+        if (ts->serializerSvc != NULL) {
+            pubsub_msg_serializer_map_t* map = NULL;
+            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
+            if (map != NULL) {
+                hashMap_put(ts->msgSerializerMapMap, svc, map);
+                hashMap_put(ts->bundleMap, svc, bundle);
+            }
+        }
+    }
        celixThreadMutex_unlock(&ts->ts_lock);
        printf("TS: New subscriber registered.\n");
        return status;
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void* svc) {
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->msgSerializers, service)) {
-               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializers, service);
-        ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-       }
+    if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+        pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializerMapMap, svc);
+        if (ts->serializerSvc != NULL){
+            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
+            hashMap_remove(ts->bundleMap, svc);
+            hashMap_remove(ts->msgSerializerMapMap, svc);
+        }
+    }
        celixThreadMutex_unlock(&ts->ts_lock);
 
        printf("TS: Subscriber unregistered.\n");
@@ -500,7 +530,7 @@ static void process_msg(topic_subscription_pt sub, 
array_list_pt msg_list) {
 
        pubsub_msg_header_pt first_msg_hdr = 
(pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
 
-       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializers);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializerMapMap);
        while (hashMapIterator_hasNext(&iter)) {
                hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
                pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c 
b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
index 2dd8258..6145a38 100644
--- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
+++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c
@@ -250,14 +250,17 @@ static void 
pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle
                     bool clash = hashMap_containsKey(msgSerializers, 
(void*)(uintptr_t)msgId);
                     if (clash) {
                         printf("Cannot add msg %s. clash in msg id %d!!\n", 
msgName, msgId);
+                        free(impl);
+                        dynMessage_destroy(msgType);
                     } else if ( msgName != NULL && msgVersion != NULL && msgId 
!= 0) {
                         hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, 
&impl->msgSerializer);
                     } else {
-                        printf("Error adding creating msg serializer\n");
+                        printf("Error creating msg serializer\n");
+                        free(impl);
+                        dynMessage_destroy(msgType);
                     }
 
-                }
-                else{
+                } else{
                     printf("DMU: cannot parse message from descriptor 
%s\n.",path);
                 }
                 fclose(stream);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c 
b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
index 36ea422..6047cf8 100644
--- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c
@@ -226,10 +226,20 @@ celix_status_t pubsub_topologyManager_psaAdded(void * 
handle, service_reference_
 
        celixThreadMutex_unlock(&manager->publicationsLock);
 
+
+       celixThreadMutex_lock(&manager->serializerListLock);
+       unsigned int size = arrayList_size(manager->serializerList);
+       if (size > 0) {
+               pubsub_serializer_service_t* ser = 
arrayList_get(manager->serializerList, (size-1)); //last, same as result of 
add/remove serializer
+               new_psa->setSerializer(new_psa->admin, ser);
+       }
+       celixThreadMutex_unlock(&manager->serializerListLock);
+
        celixThreadMutex_lock(&manager->psaListLock);
        arrayList_add(manager->psaList, new_psa);
        celixThreadMutex_unlock(&manager->psaListLock);
 
+
        return status;
 }
 
@@ -335,7 +345,6 @@ celix_status_t 
pubsub_topologyManager_pubsubSerializerAdded(void* handle, servic
        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
pubsub serializer");
 
        int i;
-
        for(i=0; i<arrayList_size(manager->psaList); i++){
                pubsub_admin_service_pt psa = (pubsub_admin_service_pt) 
arrayList_get(manager->psaList,i);
                psa->setSerializer(psa->admin, new_serializer);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/test/CMakeLists.txt b/pubsub/test/CMakeLists.txt
index 7cd0003..3b1655b 100644
--- a/pubsub/test/CMakeLists.txt
+++ b/pubsub/test/CMakeLists.txt
@@ -38,11 +38,11 @@ bundle_files(pubsub_sut
 add_deploy(pubsub_udpmc_sut
     NAME deploy_sut
     BUNDLES
+        org.apache.celix.pubsub_serializer.PubSubSerializerJson
         org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+        #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+        org.apache.celix.pubsub_admin.PubSubAdminZmq
         org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
-        #org.apache.celix.pubsub_admin.PubSubAdminZmq
-        org.apache.celix.pubsub_serializer.PubSubSerializerJson
         pubsub_sut
     DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
 )
@@ -60,11 +60,11 @@ bundle_files(pubsub_tst
 add_deploy(pubsub_udpmc_tst
     NAME deploy_tst
     BUNDLES
-        org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
-        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
-        org.apache.celix.pubsub_admin.PubSubAdminUdpMc
-        #org.apache.celix.pubsub_admin.PubSubAdminZmq
         org.apache.celix.pubsub_serializer.PubSubSerializerJson
+        org.apache.celix.pubsub_topology_manager.PubSubTopologyManager
+        org.apache.celix.pubsub_discovery.etcd.PubsubDiscovery
+        #org.apache.celix.pubsub_admin.PubSubAdminUdpMc
+        org.apache.celix.pubsub_admin.PubSubAdminZmq
         pubsub_tst
     DIR ${PROJECT_BINARY_DIR}/runtimes/test/pubsub/udpmc
     LAUNCHER celix_test_runner

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/msg_descriptors/msg.descriptor
----------------------------------------------------------------------
diff --git a/pubsub/test/msg_descriptors/msg.descriptor 
b/pubsub/test/msg_descriptors/msg.descriptor
index 03b15ba..0eb28cb 100644
--- a/pubsub/test/msg_descriptors/msg.descriptor
+++ b/pubsub/test/msg_descriptors/msg.descriptor
@@ -3,6 +3,7 @@ type=message
 name=msg
 version=1.0.0
 :annotations
+classname=org.example.Msg
 :types
 :message
-{n seqnR}
+{i seqnR}

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/msg.h
----------------------------------------------------------------------
diff --git a/pubsub/test/test/msg.h b/pubsub/test/test/msg.h
index babfd1f..49169c5 100644
--- a/pubsub/test/test/msg.h
+++ b/pubsub/test/test/msg.h
@@ -20,8 +20,10 @@
 #ifndef MSG_H
 #define MSG_H
 
+#include <stdint.h>
+
 typedef struct msg {
-    int seqNr;
+    uint32_t seqNr;
 } msg_t;
 
 #endif //MSG_H

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/sut_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/test/test/sut_activator.c b/pubsub/test/test/sut_activator.c
index 3e3b33b..6f02e79 100644
--- a/pubsub/test/test/sut_activator.c
+++ b/pubsub/test/test/sut_activator.c
@@ -19,6 +19,7 @@
 
 #include <stdio.h>
 #include <stdlib.h>
+#include <constants.h>
 
 #include "bundle_activator.h"
 #include "service_tracker.h"
@@ -57,7 +58,9 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
        act->reg = NULL;
        bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, 
&act->subSvc, props, &act->reg);
 
-       const char* filter = 
"(&(objectClass=pubsub.publisher)(pubsub.topic=pong))";
+       char filter[512];
+       snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "pong");
+
        service_tracker_customizer_pt customizer = NULL;
        serviceTrackerCustomizer_create(act, NULL, sut_pubAdded, NULL, 
sut_pubRemoved, &customizer);
        serviceTracker_createWithFilter(context, filter, customizer, 
&act->tracker);

http://git-wip-us.apache.org/repos/asf/celix/blob/2d9c77d5/pubsub/test/test/tst_activator.cpp
----------------------------------------------------------------------
diff --git a/pubsub/test/test/tst_activator.cpp 
b/pubsub/test/test/tst_activator.cpp
index 266f73e..2cf5309 100644
--- a/pubsub/test/test/tst_activator.cpp
+++ b/pubsub/test/test/tst_activator.cpp
@@ -32,7 +32,7 @@
 
 #include <CppUTest/TestHarness.h>
 #include <CppUTestExt/MockSupport.h>
-
+#include <constants.h>
 
 
 static int tst_receive(void *handle, const char *msgType, unsigned int 
msgTypeId, void *msg, pubsub_multipart_callbacks_t *callbacks, bool *release);
@@ -70,8 +70,10 @@ celix_status_t bundleActivator_start(__attribute__((unused)) 
void * userData, bu
        g_act.subSvc.receive = tst_receive;
        bundleContext_registerService(context, PUBSUB_SUBSCRIBER_SERVICE_NAME, 
&g_act.subSvc, props, &g_act.reg);
 
-    const char* filter = 
"(&(objectClass=pubsub.publisher)(pubsub.topic=ping))";
-       service_tracker_customizer_pt customizer = NULL;
+    char filter[512];
+    snprintf(filter, 512, "(&(%s=%s)(%s=%s))", OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_PUBLISHER_SERVICE_NAME, PUBSUB_PUBLISHER_TOPIC, "ping");
+
+    service_tracker_customizer_pt customizer = NULL;
        serviceTrackerCustomizer_create(&g_act, NULL, tst_pubAdded, NULL, 
tst_pubRemoved, &customizer);
        serviceTracker_createWithFilter(context, filter, customizer, 
&g_act.tracker);
        serviceTracker_open(g_act.tracker);
@@ -157,7 +159,7 @@ TEST_GROUP(PUBSUB_INT_GROUP)
             if (count > 0) {
                 break;
             } else {
-                printf("No return message received, waiting for a while\n");
+                printf("No return message received, waiting for a while. 
%d/%d\n", i+1, TRIES);
             }
         }
         CHECK(count > 0);
@@ -176,7 +178,8 @@ TEST(PUBSUB_INT_GROUP, sendRecvTest) {
     constexpr int COUNT = 50;
     msg_t msg;
     for (int i = 0; i < COUNT; ++i) {
-        msg.seqNr = i;
+        msg.seqNr = i+1;
+        printf("Sending test msg %d of %d\n", i+1, COUNT);
         pthread_mutex_lock(&g_act.mutex);
         g_act.pubSvc->send(g_act.pubSvc->handle, g_act.msgId, &msg);
         pthread_mutex_unlock(&g_act.mutex);

Reply via email to