http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
index bed5dfc..b85f0a9 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_publication.c
@@ -7,7 +7,7 @@
  *"License"); you may not use this file except in compliance
  *with the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *  htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
  *
  *Unless required by applicable law or agreed to in writing,
  *software distributed under the License is distributed on an
@@ -41,11 +41,13 @@
 #include "service_factory.h"
 #include "version.h"
 
-#include "pubsub_publish_service_private.h"
+#include "topic_publication.h"
 #include "pubsub_common.h"
 #include "publisher.h"
 #include "large_udp.h"
 
+#include "pubsub_serializer.h"
+
 #define EP_ADDRESS_LEN         32
 
 #define FIRST_SEND_DELAY       2
@@ -57,28 +59,26 @@ struct topic_publication {
        array_list_pt pub_ep_list; //List<pubsub_endpoint>
        hash_map_pt boundServices; //<bundle_pt,bound_service>
        celix_thread_mutex_t tp_lock;
+       pubsub_serializer_service_t *serializer;
        struct sockaddr_in destAddr;
-       pubsub_serializer_service_t* serializerSvc;
 };
 
 typedef struct publish_bundle_bound_service {
        topic_publication_pt parent;
-       pubsub_publisher_t pubSvc;
+       pubsub_publisher_t service;
        bundle_pt bundle;
-    char *scope;
+       char *scope;
        char *topic;
+       hash_map_pt msgTypes;
        unsigned short getCount;
        celix_thread_mutex_t mp_lock;
-       bool mp_send_in_progress;
-       array_list_pt mp_parts;
        largeUdp_pt largeUdpHandle;
-       pubsub_msg_serializer_map_t* map;
-} publish_bundle_bound_service_t;
+}* publish_bundle_bound_service_pt;
 
 typedef struct pubsub_msg{
        pubsub_msg_header_pt header;
        char* payload;
-       int payloadSize;
+       size_t payloadSize;
 } pubsub_msg_t;
 
 static unsigned int rand_range(unsigned int min, unsigned int max);
@@ -86,10 +86,10 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max);
 static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 
-static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc);
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
 
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg);
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
 
 static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 
@@ -97,12 +97,12 @@ static int pubsub_localMsgTypeIdForUUID(void* handle, const 
char* msgType, unsig
 static void delay_first_send_for_late_joiners(void);
 
 
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, char* bindIP, topic_publication_pt *out){
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, topic_publication_pt *out){
 
-    char* ep = malloc(EP_ADDRESS_LEN);
-    memset(ep,0,EP_ADDRESS_LEN);
-    unsigned int port = pubEP->serviceID + 
rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
-    snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
+       char* ep = malloc(EP_ADDRESS_LEN);
+       memset(ep,0,EP_ADDRESS_LEN);
+       unsigned int port = pubEP->serviceID + 
rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+       snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
 
 
        topic_publication_pt pub = calloc(1,sizeof(*pub));
@@ -116,7 +116,8 @@ celix_status_t pubsub_topicPublicationCreate(int 
sendSocket, pubsub_endpoint_pt
        pub->destAddr.sin_family = AF_INET;
        pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
        pub->destAddr.sin_port = htons(port);
-       pub->serializerSvc = NULL;
+
+       pub->serializer = best_serializer;
 
        pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 
@@ -127,6 +128,7 @@ celix_status_t pubsub_topicPublicationCreate(int 
sendSocket, pubsub_endpoint_pt
 
 celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
        celix_status_t status = CELIX_SUCCESS;
+
        celixThreadMutex_lock(&(pub->tp_lock));
 
        free(pub->endpoint);
@@ -134,14 +136,18 @@ celix_status_t 
pubsub_topicPublicationDestroy(topic_publication_pt pub){
 
        hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
        while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(iter);
+               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
                pubsub_destroyPublishBundleBoundService(bound);
        }
        hashMapIterator_destroy(iter);
        hashMap_destroy(pub->boundServices,false,false);
 
        pub->svcFactoryReg = NULL;
-       status = close(pub->sendSocket);
+       pub->serializer = NULL;
+
+       if(close(pub->sendSocket) != 0){
+               status = CELIX_FILE_IO_EXCEPTION;
+       }
 
        celixThreadMutex_unlock(&(pub->tp_lock));
 
@@ -156,7 +162,6 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
        celix_status_t status = CELIX_SUCCESS;
 
        /* Let's register the new service */
-       //celixThreadMutex_lock(&(pub->tp_lock));
 
        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 
@@ -167,39 +172,29 @@ celix_status_t 
pubsub_topicPublicationStart(bundle_context_pt bundle_context,top
                factory->ungetService = pubsub_topicPublicationUngetService;
 
                properties_pt props = properties_create();
-        properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
                properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
 
                status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 
                if(status != CELIX_SUCCESS){
                        properties_destroy(props);
-                       printf("PSA: Cannot register ServiceFactory for topic 
%s, topic %s (bundle %ld).\n",pubEP->scope, pubEP->topic,pubEP->serviceID);
+                       printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register 
ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, 
pubEP->topic,pubEP->serviceID);
                }
                else{
                        *svcFactory = factory;
                }
        }
        else{
-               printf("PSA: Cannot find pubsub_endpoint after adding 
it...Should never happen!\n");
+               printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint 
after adding it...Should never happen!\n");
                status = CELIX_SERVICE_EXCEPTION;
        }
 
-       //celixThreadMutex_unlock(&(pub->tp_lock));
-
        return status;
 }
 
 celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-       celix_status_t status = CELIX_SUCCESS;
-
-       //celixThreadMutex_lock(&(pub->tp_lock));
-
-       status = serviceRegistration_unregister(pub->svcFactoryReg);
-
-       //celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return status;
+       return serviceRegistration_unregister(pub->svcFactoryReg);
 }
 
 celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
@@ -221,66 +216,12 @@ celix_status_t 
pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub
        return CELIX_SUCCESS;
 }
 
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
+       array_list_pt list = NULL;
        celixThreadMutex_lock(&(pub->tp_lock));
-
-    //clear old serializer
-    if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices); //key = bundle_pt, 
publish_bundle_bound_service_t*
-        while (hashMapIterator_hasNext(&iter)) {
-            publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-                       
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-                       bound->map = NULL;
-        }
-    }
-
-    //setup new serializer
-       pub->serializerSvc = serializerSvc;
-    if (pub->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            bundle_pt bundle = hashMapEntry_getKey(entry);
-            publish_bundle_bound_service_t *bound = 
hashMapEntry_getValue(entry);
-            celixThreadMutex_lock(&bound->mp_lock);
-            
pub->serializerSvc->createSerializerMap(pub->serializerSvc->handle, bundle, 
&bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-        }
-    }
-
+       list = arrayList_clone(pub->pub_ep_list);
        celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* svc){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-    if (pub->serializerSvc == svc) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(pub->boundServices);
-        while (hashMapIterator_hasNext(&iter)) {
-            publish_bundle_bound_service_t* bound = 
hashMapIterator_nextValue(&iter);
-            celixThreadMutex_lock(&bound->mp_lock);
-            
pub->serializerSvc->destroySerializerMap(pub->serializerSvc->handle, 
bound->map);
-            celixThreadMutex_unlock(&bound->mp_lock);
-                       bound->map = NULL;
-        }
-
-        pub->serializerSvc = NULL;
-    }
-
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return status;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
-       return pub->pub_ep_list;
+       return list;
 }
 
 
@@ -291,19 +232,19 @@ static celix_status_t 
pubsub_topicPublicationGetService(void* handle, bundle_pt
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices, bundle);
-       if (bound == NULL) {
-               bound = pubsub_createPublishBundleBoundService(publish, bundle);
-               if (bound != NULL) {
-                       hashMap_put(publish->boundServices, bundle, bound);
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       if(bound==NULL){
+               bound = pubsub_createPublishBundleBoundService(publish,bundle);
+               if(bound!=NULL){
+                       hashMap_put(publish->boundServices,bundle,bound);
                }
        }
-       else {
+       else{
                bound->getCount++;
        }
 
        if (bound != NULL) {
-               *service = &bound->pubSvc;
+               *service = &bound->service;
        }
 
        celixThreadMutex_unlock(&(publish->tp_lock));
@@ -317,19 +258,20 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
 
        celixThreadMutex_lock(&(publish->tp_lock));
 
-       publish_bundle_bound_service_t* bound = 
hashMap_get(publish->boundServices, bundle);
-       if (bound != NULL) {
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       if(bound!=NULL){
 
                bound->getCount--;
-               if (bound->getCount == 0) {
+               if(bound->getCount==0){
                        pubsub_destroyPublishBundleBoundService(bound);
                        hashMap_remove(publish->boundServices,bundle);
                }
+
        }
-       else {
+       else{
                long bundleId = -1;
                bundle_getBundleId(bundle,&bundleId);
-               printf("TP: Unexpected ungetService call for bundle %ld.\n", 
bundleId);
+               printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
        }
 
        /* service should be never used for unget, so let's set the pointer to 
NULL */
@@ -340,7 +282,7 @@ static celix_status_t 
pubsub_topicPublicationUngetService(void* handle, bundle_p
        return CELIX_SUCCESS;
 }
 
-static bool send_pubsub_msg(publish_bundle_bound_service_t* bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
+static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
        const int iovec_len = 3; // header + size + payload
        bool ret = true;
 
@@ -357,50 +299,36 @@ static bool 
send_pubsub_msg(publish_bundle_bound_service_t* bound, pubsub_msg_t*
        delay_first_send_for_late_joiners();
 
        if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, 
msg_iovec, iovec_len, 0, &bound->parent->destAddr, 
sizeof(bound->parent->destAddr)) == -1) {
-           fprintf(stderr, "Socket: %d, size: %i",bound->parent->sendSocket, 
compiledMsgSize);
-           perror("send_pubsub_msg:sendSocket");
-           ret = false;
+               fprintf(stderr, "Socket: %d, size: 
%i",bound->parent->sendSocket, compiledMsgSize);
+               perror("send_pubsub_msg:sendSocket");
+               ret = false;
        }
 
        if(releaseCallback) {
-           releaseCallback->release(msg->payload, bound);
+               releaseCallback->release(msg->payload, bound);
        }
        return ret;
-}
 
+}
 
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
-    int status = 0;
-    publish_bundle_bound_service_t* bound = handle;
 
-    celixThreadMutex_lock(&(bound->parent->tp_lock));
-    celixThreadMutex_lock(&(bound->mp_lock));
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *inMsg) {
+       int status = 0;
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
 
-    pubsub_msg_serializer_t *msgSer = NULL;
-    if (bound->map != NULL) {
-        msgSer = hashMap_get(bound->map->serializers, (void 
*)(uintptr_t)msgTypeId);
-    }
+       celixThreadMutex_lock(&(bound->parent->tp_lock));
+       celixThreadMutex_lock(&(bound->mp_lock));
 
-    if (bound->map == NULL) {
-        printf("TP: Serializer is not set!\n");
-        status = 1;
-    } else if (msgSer == NULL ){
-        printf("TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
-        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
-        printf("Note supported messages:\n");
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_t *msgSer = hashMapIterator_nextValue(&iter);
-            printf("\tmsg %s with id %d\n", msgSer->msgName, msgSer->msgId);
-        }
-        status = 1;
-    }
+       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
 
-    int major=0, minor=0;
+       if (msgSer != NULL) {
+               int major=0, minor=0;
 
-    if (status == 0 && msgSer != NULL) {
                pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
                strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
                msg_hdr->type = msgTypeId;
+
+
                if (msgSer->msgVersion != NULL){
                        version_getMajor(msgSer->msgVersion, &major);
                        version_getMinor(msgSer->msgVersion, &minor);
@@ -408,15 +336,16 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
                        msg_hdr->minor = minor;
                }
 
-               char* serializedOutput = NULL;
+               void* serializedOutput = NULL;
                size_t serializedOutputLen = 0;
-        msgSer->serialize(msgSer->handle, msg, &serializedOutput, 
&serializedOutputLen);
+               msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
 
-               pubsub_msg_t* msg = calloc(1,sizeof(struct pubsub_msg));
+               pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
                msg->header = msg_hdr;
-               msg->payload = serializedOutput;
+               msg->payload = (char *)serializedOutput;
                msg->payloadSize = serializedOutputLen;
 
+
                if(send_pubsub_msg(bound, msg,true, NULL) == false) {
                        status = -1;
                }
@@ -424,38 +353,21 @@ static int pubsub_topicPublicationSend(void* handle, 
unsigned int msgTypeId, con
                free(msg);
                free(serializedOutput);
 
-    }
 
-    celixThreadMutex_unlock(&(bound->mp_lock));
+       } else {
+               printf("PSA_UDP_MC_TP: No msg serializer available for msg type 
id %d\n", msgTypeId);
+               status=-1;
+       }
+
+       celixThreadMutex_unlock(&(bound->mp_lock));
        celixThreadMutex_unlock(&(bound->parent->tp_lock));
 
-    return status;
+       return status;
 }
 
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* out){
-    publish_bundle_bound_service_t* bound = handle;
-    unsigned int msgTypeId = 0;
-
-    celixThreadMutex_lock(&bound->mp_lock);
-    if (bound->map != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(bound->map->serializers);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-            if (strncmp(msgType, msgSer->msgName, 1024*1024) == 0) {
-                msgTypeId = msgSer->msgId;
-                break;
-            }
-        }
-    }
-    celixThreadMutex_unlock(&bound->mp_lock);
-
-    if (msgTypeId != 0) {
-        *out = msgTypeId;
-        return 0;
-    } else {
-        printf("TP: Cannot find msg type id for msg type %s\n", msgType);
-        return 1;
-    }
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
 }
 
 
@@ -466,58 +378,49 @@ static unsigned int rand_range(unsigned int min, unsigned 
int max){
 
 }
 
-static publish_bundle_bound_service_t* 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle) {
-    //PRECOND lock on publish->tp_lock
-    publish_bundle_bound_service_t* bound = calloc(1, sizeof(*bound));
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+
+       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 
        if (bound != NULL) {
+
                bound->parent = tp;
                bound->bundle = bundle;
                bound->getCount = 1;
-               bound->mp_send_in_progress = false;
                celixThreadMutex_create(&bound->mp_lock,NULL);
-               arrayList_create(&bound->mp_parts);
+
+               if(tp->serializer != NULL){
+                       
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+               }
 
                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
                bound->scope=strdup(pubEP->scope);
                bound->topic=strdup(pubEP->topic);
                bound->largeUdpHandle = largeUdp_create(1);
-               bound->pubSvc.handle = bound;
-               bound->pubSvc.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->pubSvc.send = pubsub_topicPublicationSend;
-               bound->pubSvc.sendMultipart = NULL;  //Multipart not supported 
(jet) for UDP
 
-       if (tp->serializerSvc != NULL) {
-            tp->serializerSvc->createSerializerMap(tp->serializerSvc->handle, 
bundle, &bound->map);
-               }
-       }
-       else
-       {
-               free(bound);
-               return NULL;
+               bound->service.handle = bound;
+               bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
+               bound->service.send = pubsub_topicPublicationSend;
+               bound->service.sendMultipart = NULL;  //Multipart not supported 
for UDP
+
        }
 
        return bound;
 }
 
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_t* 
boundSvc) {
-    //PRECOND lock on publish->tp_lock
-       celixThreadMutex_lock(&boundSvc->mp_lock);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
 
-    if (boundSvc->map != NULL && boundSvc->parent->serializerSvc != NULL) {
-       
boundSvc->parent->serializerSvc->destroySerializerMap(boundSvc->parent->serializerSvc->handle,
 boundSvc->map);
-       boundSvc->map = NULL;
-    }
+       celixThreadMutex_lock(&boundSvc->mp_lock);
 
-       if (boundSvc->mp_parts!=NULL) {
-               arrayList_destroy(boundSvc->mp_parts);
+       if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+               
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
        }
 
-    if (boundSvc->scope!=NULL) {
-        free(boundSvc->scope);
-    }
+       if(boundSvc->scope!=NULL){
+               free(boundSvc->scope);
+       }
 
-    if (boundSvc->topic!=NULL) {
+       if(boundSvc->topic!=NULL){
                free(boundSvc->topic);
        }
 
@@ -535,7 +438,7 @@ static void delay_first_send_for_late_joiners(){
        static bool firstSend = true;
 
        if(firstSend){
-               printf("TP: Delaying first send for late joiners...\n");
+               printf("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
                sleep(FIRST_SEND_DELAY);
                firstSend = false;
        }

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
index 7a3f5a9..5896264 100644
--- a/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
+++ b/pubsub/pubsub_admin_udp_mc/private/src/topic_subscription.c
@@ -29,35 +29,31 @@
 #include <unistd.h>
 #include <signal.h>
 
+#include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/epoll.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 
-#if defined(__APPLE__) && defined(__MACH__)
-       #include <sys/event.h>
-       #include <sys/time.h>
-#else
-       #include <sys/epoll.h>
-#endif
-
 #include "utils.h"
 #include "celix_errno.h"
 #include "constants.h"
 #include "version.h"
 
 #include "topic_subscription.h"
+#include "topic_publication.h"
 #include "subscriber.h"
 #include "publisher.h"
-#include "pubsub_publish_service_private.h"
 #include "large_udp.h"
 
+#include "pubsub_serializer.h"
+
 #define MAX_EPOLL_EVENTS        10
 #define RECV_THREAD_TIMEOUT     5
 #define UDP_BUFFER_SIZE         65535
 #define MAX_UDP_SESSIONS        16
 
-struct topic_subscription {
-
+struct topic_subscription{
        char* ifIpAddress;
        service_tracker_pt tracker;
        array_list_pt sub_ep_list;
@@ -65,25 +61,24 @@ struct topic_subscription {
        bool running;
        celix_thread_mutex_t ts_lock;
        bundle_context_pt context;
-       int topicEpollFd; // EPOLL filedescriptor where the sockets are 
registered.
 
-    //NOTE. using a service ptr can be dangerous, because pointer can be 
reused.
-    //ensuring that pointer are removed before new (refurbish) pionter comes 
along is crucial!
-       hash_map_pt msgSerializerMapMap; // key = service ptr, value = 
pubsub_msg_serializer_map_t*
-    hash_map_pt bundleMap; //key = service ptr, value = bundle_pt
+       pubsub_serializer_service_t *serializer;
 
+       int topicEpollFd; // EPOLL filedescriptor where the sockets are 
registered.
+       hash_map_pt servicesMap; // key = service, value = msg types map
        hash_map_pt socketMap; // key = URL, value = listen-socket
+
+       celix_thread_mutex_t pendingConnections_lock;
+       array_list_pt pendingConnections;
+
+       array_list_pt pendingDisconnections;
+       celix_thread_mutex_t pendingDisconnections_lock;
+
+       //array_list_pt rawServices;
        unsigned int nrSubscribers;
        largeUdp_pt largeUdpHandle;
-       pubsub_serializer_service_t* serializerSvc;
-
 };
 
-typedef struct mp_handle{
-       hash_map_pt svc_msg_db;
-       hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
-
 typedef struct msg_map_entry{
        bool retain;
        void* msgInst;
@@ -95,9 +90,11 @@ static void* udp_recv_thread_func(void* arg);
 static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
 static void sigusr1_sighandler(int signo);
 static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
 
 
-celix_status_t pubsub_topicSubscriptionCreate(char* ifIp,bundle_context_pt 
bundle_context, pubsub_serializer_service_t* serializer, char* scope, char* 
topic,topic_subscription_pt* out){
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
        celix_status_t status = CELIX_SUCCESS;
 
        topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
@@ -109,35 +106,39 @@ celix_status_t pubsub_topicSubscriptionCreate(char* 
ifIp,bundle_context_pt bundl
        ts->topicEpollFd = epoll_create1(0);
 #endif
        if(ts->topicEpollFd == -1) {
-           status += CELIX_SERVICE_EXCEPTION;
+               status += CELIX_SERVICE_EXCEPTION;
        }
 
        ts->running = false;
        ts->nrSubscribers = 0;
-       ts->serializerSvc = NULL;
+       ts->serializer = best_serializer;
 
        celixThreadMutex_create(&ts->ts_lock,NULL);
        arrayList_create(&ts->sub_ep_list);
-       ts->msgSerializerMapMap = hashMap_create(NULL, NULL, NULL, NULL);
-    ts->bundleMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
        ts->socketMap =  hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
+       arrayList_create(&ts->pendingConnections);
+       arrayList_create(&ts->pendingDisconnections);
+       celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+       celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+
        ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
 
        char filter[128];
        memset(filter,0,128);
        if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, 
strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
-        // default scope, means that subscriber has not defined a scope 
property
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic);
-
-    } else {
-        snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                PUBSUB_SUBSCRIBER_TOPIC,topic,
-                PUBSUB_SUBSCRIBER_SCOPE,scope);
-    }
+               // default scope, means that subscriber has not defined a scope 
property
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+       } else {
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic,
+                               PUBSUB_SUBSCRIBER_SCOPE,scope);
+       }
 
        service_tracker_customizer_pt customizer = NULL;
        status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
@@ -151,10 +152,9 @@ celix_status_t pubsub_topicSubscriptionCreate(char* 
ifIp,bundle_context_pt bundl
 
        sigaction(SIGUSR1,&actions,NULL);
 
-    if (status == CELIX_SUCCESS) {
-        *out=ts;
-        pubsub_topicSubscriptionSetSerializer(ts, serializer);
-    }
+       if (status == CELIX_SUCCESS) {
+               *out=ts;
+       }
 
        return status;
 }
@@ -168,10 +168,20 @@ celix_status_t 
pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
        serviceTracker_destroy(ts->tracker);
        arrayList_clear(ts->sub_ep_list);
        arrayList_destroy(ts->sub_ep_list);
-       hashMap_destroy(ts->msgSerializerMapMap, false, false);
-    hashMap_destroy(ts->bundleMap, false, false);
+       hashMap_destroy(ts->servicesMap,false,false);
+
+       hashMap_destroy(ts->socketMap,true,true);
+
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_destroy(ts->pendingConnections);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_destroy(ts->pendingDisconnections);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
 
-       hashMap_destroy(ts->socketMap,false,false);
        largeUdp_destroy(ts->largeUdpHandle);
 #if defined(__APPLE__) && defined(__MACH__)
        //TODO: Use kqueue for OSX
@@ -211,15 +221,16 @@ celix_status_t 
pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
        celixThread_join(ts->recv_thread,NULL);
 
-    status = serviceTracker_close(ts->tracker);
+       status = serviceTracker_close(ts->tracker);
 
-    hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
-    while(hashMapIterator_hasNext(it)) {
-        hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
-        char *url = hashMapEntry_getKey(entry);
-        pubsub_topicSubscriptionDisconnectPublisher(ts, url);
-    }
-    hashMapIterator_destroy(it);
+       hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
+       while(hashMapIterator_hasNext(it)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
+               char *url = hashMapEntry_getKey(entry);
+               pubsub_topicSubscriptionDisconnectPublisher(ts, url);
+               free(url);
+       }
+       hashMapIterator_destroy(it);
 
 
        return status;
@@ -227,108 +238,126 @@ celix_status_t 
pubsub_topicSubscriptionStop(topic_subscription_pt ts){
 
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL) {
 
-    printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", pubURL);
+       printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", 
pubURL);
 
-    celix_status_t status = CELIX_SUCCESS;
+       celix_status_t status = CELIX_SUCCESS;
+       celixThreadMutex_lock(&ts->ts_lock);
 
-    if (!hashMap_containsKey(ts->socketMap, pubURL)){
+       if(hashMap_containsKey(ts->socketMap, pubURL)){
+               printf("PSA_UDM_MC_TS: PubURL %s already existing!\n",pubURL);
+               celixThreadMutex_unlock(&ts->ts_lock);
+               return CELIX_SERVICE_EXCEPTION;
+       }
 
-               celixThreadMutex_lock(&ts->ts_lock);
+       int *recvSocket = calloc(sizeof(int), 1);
+       *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+       if (*recvSocket < 0) {
+               perror("pubsub_topicSubscriptionCreate:socket");
+               status = CELIX_SERVICE_EXCEPTION;
+       }
 
-               int *recvSocket = calloc(sizeof(int), 1);
-               *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
-               if (*recvSocket < 0) {
-                       perror("pubsub_topicSubscriptionCreate:socket");
+       if (status == CELIX_SUCCESS){
+               int reuse = 1;
+               if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, (char*) 
&reuse, sizeof(reuse)) != 0) {
+                       perror("setsockopt() SO_REUSEADDR");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+
+       if(status == CELIX_SUCCESS){
+               // TODO Check if there is a better way to parse the URL to 
IP/Portnr
+               //replace ':' by spaces
+               char *url = strdup(pubURL);
+               char *pt = url;
+               while((pt=strchr(pt, ':')) != NULL) {
+                       *pt = ' ';
+               }
+               char mcIp[100];
+               unsigned short mcPort;
+               sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+               free (url);
+
+               printf("pubsub_topicSubscriptionConnectPublisher : IP = %s, 
Port = %hu\n", mcIp, mcPort);
+
+               struct ip_mreq mc_addr;
+               mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+               mc_addr.imr_interface.s_addr = inet_addr(ts->ifIpAddress);
+               printf("Adding MC %s at interface %s\n", mcIp, ts->ifIpAddress);
+               if (setsockopt(*recvSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, 
(char*) &mc_addr, sizeof(mc_addr)) != 0) {
+                       perror("setsockopt() IP_ADD_MEMBERSHIP");
                        status = CELIX_SERVICE_EXCEPTION;
                }
 
                if (status == CELIX_SUCCESS){
-                       int reuse = 1;
-                       if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, 
(char*) &reuse, sizeof(reuse)) != 0) {
-                               perror("setsockopt() SO_REUSEADDR");
+                       struct sockaddr_in mcListenAddr;
+                       mcListenAddr.sin_family = AF_INET;
+                       mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+                       mcListenAddr.sin_port = htons(mcPort);
+                       if(bind(*recvSocket, (struct sockaddr*)&mcListenAddr, 
sizeof(mcListenAddr)) != 0) {
+                               perror("bind()");
                                status = CELIX_SERVICE_EXCEPTION;
                        }
                }
 
                if (status == CELIX_SUCCESS){
-                       // TODO Check if there is a better way to parse the URL 
to IP/Portnr
-                       //replace ':' by spaces
-                       char *url = strdup(pubURL);
-                       char *pt = url;
-                       while((pt=strchr(pt, ':')) != NULL) {
-                               *pt = ' ';
-                       }
-                       char mcIp[100];
-                       unsigned short mcPort;
-                       sscanf(url, "udp //%s %hu", mcIp, &mcPort);
-                       free (url);
-
-                       printf("pubsub_topicSubscriptionConnectPublisher : IP = 
%s, Port = %hu\n", mcIp, mcPort);
-
-                       struct ip_mreq mc_addr;
-                       mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
-                       mc_addr.imr_interface.s_addr = 
inet_addr(ts->ifIpAddress);
-                       printf("Adding MC %s at interface %s\n", mcIp, 
ts->ifIpAddress);
-                       if (setsockopt(*recvSocket, IPPROTO_IP, 
IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
-                               perror("setsockopt() IP_ADD_MEMBERSHIP");
+#if defined(__APPLE__) && defined(__MACH__)
+                       //TODO: Use kqueue for OSX
+#else
+                       struct epoll_event ev;
+                       memset(&ev, 0, sizeof(ev));
+                       ev.events = EPOLLIN;
+                       ev.data.fd = *recvSocket;
+                       if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, 
*recvSocket, &ev) == -1) {
+                               perror("epoll_ctl() EPOLL_CTL_ADD");
                                status = CELIX_SERVICE_EXCEPTION;
                        }
+#endif
+               }
 
-                       if (status == CELIX_SUCCESS){
-                               struct sockaddr_in mcListenAddr;
-                               mcListenAddr.sin_family = AF_INET;
-                               mcListenAddr.sin_addr.s_addr = INADDR_ANY;
-                               mcListenAddr.sin_port = htons(mcPort);
-                               if(bind(*recvSocket, (struct 
sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
-                                       perror("bind()");
-                                       status = CELIX_SERVICE_EXCEPTION;
-                               }
-                       }
-
-                       if (status == CELIX_SUCCESS){
-                               #if defined(__APPLE__) && defined(__MACH__)
-                                       //TODO: Use kqueue for OSX
-                               #else
-                                       struct epoll_event ev;
-                                       memset(&ev, 0, sizeof(ev));
-                                       ev.events = EPOLLIN;
-                                       ev.data.fd = *recvSocket;
-                                       if(epoll_ctl(ts->topicEpollFd, 
EPOLL_CTL_ADD, *recvSocket, &ev) == -1) {
-                                               perror("epoll_ctl() 
EPOLL_CTL_ADD");
-                                               status = 
CELIX_SERVICE_EXCEPTION;
-                                       }
-                               #endif
-                       }
+       }
 
-               }
+       if (status == CELIX_SUCCESS){
+               hashMap_put(ts->socketMap, strdup(pubURL), (void*)recvSocket);
+       }else{
+               free(recvSocket);
+       }
 
-               if (status == CELIX_SUCCESS){
-                       hashMap_put(ts->socketMap, pubURL, (void*)recvSocket);
-               }else{
-                       free(recvSocket);
-               }
+       celixThreadMutex_unlock(&ts->ts_lock);
 
-               celixThreadMutex_unlock(&ts->ts_lock);
+       return status;
+}
 
-    }
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_add(ts->pendingConnections, url);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       return status;
+}
 
-    return status;
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_add(ts->pendingDisconnections, url);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       return status;
 }
 
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
-    printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", 
pubURL);
-    celix_status_t status = CELIX_SUCCESS;
+       printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", 
pubURL);
+       celix_status_t status = CELIX_SUCCESS;
+       struct epoll_event ev;
+       memset(&ev, 0, sizeof(ev));
 
-    if (hashMap_containsKey(ts->socketMap, pubURL)){
+       celixThreadMutex_lock(&ts->ts_lock);
+
+       if (hashMap_containsKey(ts->socketMap, pubURL)){
 
 #if defined(__APPLE__) && defined(__MACH__)
-    //TODO: Use kqueue for OSX
+               //TODO: Use kqueue for OSX
 #else
-               struct epoll_event ev;
-               memset(&ev, 0, sizeof(ev));
-
-               celixThreadMutex_lock(&ts->ts_lock);
-
                int *s = hashMap_remove(ts->socketMap, pubURL);
                if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
                        printf("in if error()\n");
@@ -336,11 +365,11 @@ celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt
                        status = CELIX_SERVICE_EXCEPTION;
                }
                free(s);
-
-               celixThreadMutex_unlock(&ts->ts_lock);
 #endif
 
-    }
+       }
+
+       celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
 }
@@ -349,9 +378,7 @@ celix_status_t 
pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, p
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        arrayList_add(ts->sub_ep_list,subEP);
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -362,9 +389,7 @@ celix_status_t 
pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        ts->nrSubscribers++;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -374,22 +399,17 @@ celix_status_t 
pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt ts
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        arrayList_removeElement(ts->sub_ep_list,subEP);
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
-
 }
 
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
        celix_status_t status = CELIX_SUCCESS;
 
        celixThreadMutex_lock(&ts->ts_lock);
-
        ts->nrSubscribers--;
-
        celixThreadMutex_unlock(&ts->ts_lock);
 
        return status;
@@ -399,153 +419,118 @@ unsigned int 
pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
        return ts->nrSubscribers;
 }
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-    //clear old
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-
-        }
-    }
-       ts->serializerSvc = serializerSvc;
-    //init new
-    if (ts->serializerSvc != NULL) {
-        hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-        while (hashMapIterator_hasNext(&iter)) {
-            pubsub_subscriber_t* subsvc = hashMapIterator_nextKey(&iter);
-            bundle_pt bundle = hashMap_get(ts->bundleMap, subsvc);
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, map);
-        }
-    }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
+       return sub->sub_ep_list;
 }
 
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       if (ts->serializerSvc == serializerSvc) { //only act if svc removed is 
services used
-               hash_map_iterator_t iter = 
hashMapIterator_construct(ts->msgSerializerMapMap);
-               while (hashMapIterator_hasNext(&iter)) {
-            hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
-            pubsub_subscriber_t* subsvc = hashMapEntry_getKey(entry);
-            pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-            ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, 
map);
-            hashMap_put(ts->msgSerializerMapMap, subsvc, NULL);
-               }
-               ts->serializerSvc = NULL;
-       }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
 
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void* svc){
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
        celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
+       if (!hashMap_containsKey(ts->servicesMap, service)) {
                bundle_pt bundle = NULL;
+               hash_map_pt msgTypes = NULL;
+
                serviceReference_getBundle(reference, &bundle);
 
-               if (ts->serializerSvc != NULL) {
-            pubsub_msg_serializer_map_t* map = NULL;
-            ts->serializerSvc->createSerializerMap(ts->serializerSvc->handle, 
bundle, &map);
-            if (map != NULL) {
-                hashMap_put(ts->msgSerializerMapMap, svc, map);
-                hashMap_put(ts->bundleMap, svc, bundle);
-            }
+               if(ts->serializer != NULL && bundle!=NULL){
+                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+                       if(msgTypes != NULL){
+                               hashMap_put(ts->servicesMap, service, msgTypes);
+                               printf("PSA_UDP_MC_TS: New subscriber 
registered.\n");
+                       }
+               }
+               else{
+                       printf("PSA_UDP_MC_TS: Cannot register new 
subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
                }
        }
        celixThreadMutex_unlock(&ts->ts_lock);
-       printf("TS: New subscriber registered.\n");
+
        return status;
 
 }
 
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void* svc){
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
        celix_status_t status = CELIX_SUCCESS;
        topic_subscription_pt ts = handle;
 
-
-    celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->msgSerializerMapMap, svc)) {
-               pubsub_msg_serializer_map_t* map = 
hashMap_remove(ts->msgSerializerMapMap, svc);
-               if (ts->serializerSvc != NULL){
-                       
ts->serializerSvc->destroySerializerMap(ts->serializerSvc->handle, map);
-            hashMap_remove(ts->bundleMap, svc);
-            hashMap_remove(ts->msgSerializerMapMap, svc);
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (hashMap_containsKey(ts->servicesMap, service)) {
+               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+               if(msgTypes!=NULL && ts->serializer!=NULL){
+                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
                }
        }
        celixThreadMutex_unlock(&ts->ts_lock);
 
-       printf("TS: Subscriber unregistered.\n");
+       printf("PSA_UDP_MC_TS: Subscriber unregistered.\n");
        return status;
 }
 
 
-static void process_msg(topic_subscription_pt sub, pubsub_udp_msg_t* msg){
+static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){
 
-       hash_map_iterator_t iter = 
hashMapIterator_construct(sub->msgSerializerMapMap);
        celixThreadMutex_lock(&sub->ts_lock);
-       while (hashMapIterator_hasNext(&iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(&iter);
+       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+       while (hashMapIterator_hasNext(iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
                pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-               pubsub_msg_serializer_map_t* map = hashMapEntry_getValue(entry);
-
-               pubsub_msg_serializer_t* msgSer = hashMap_get(map->serializers, 
(void *)(uintptr_t )msg->header.type);
+               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
 
+               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type);
                if (msgSer == NULL) {
-                       printf("TS: Primary message %d not supported. NOT 
receiving any part of the whole message.\n",msg->header.type);
-               } else {
+                       printf("PSA_UDP_MC_TS: Serializer not available for 
message %d.\n",msg->header.type);
+               }
+               else{
                        void *msgInst = NULL;
-                       bool validVersion = checkVersion(msgSer->msgVersion, 
&msg->header);
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,&msg->header);
+
                        if(validVersion){
-                celix_status_t status = msgSer->deserialize(msgSer->handle, 
msg->payload, 0, &msgInst);
+
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
                                if (status == CELIX_SUCCESS) {
                                        bool release = true;
                                        pubsub_multipart_callbacks_t 
mp_callbacks;
-                                       mp_callbacks.handle = map;
+                                       mp_callbacks.handle = sub;
                                        mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
                                        mp_callbacks.getMultipart = NULL;
 
                                        subsvc->receive(subsvc->handle, 
msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
-                                       if (release) {
-                        msgSer->freeMsg(msgSer->handle, msgInst);
+
+                                       if(release){
+                                               msgSer->freeMsg(msgSer,msgInst);
                                        }
                                }
                                else{
-                                       printf("TS: Cannot deserialize msgType 
%s.\n", msgSer->msgName);
+                                       printf("PSA_UDP_MC_TS: Cannot 
deserialize msgType %s.\n",msgSer->msgName);
                                }
 
                        }
-                       else {
+                       else{
                                int major=0,minor=0;
-                               version_getMajor(msgSer->msgVersion, &major);
-                               version_getMinor(msgSer->msgVersion, &minor);
-                               printf("TS: Version mismatch for primary 
message '%s' (have %d.%d, received %u.%u). NOT sending any part of the whole 
message.\n",
-                       msgSer->msgName, major, minor, msg->header.major, 
msg->header.minor);
+                               version_getMajor(msgSer->msgVersion,&major);
+                               version_getMinor(msgSer->msgVersion,&minor);
+                               printf("PSA_UDP_MC_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
+                                               
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
                        }
+
                }
        }
+       hashMapIterator_destroy(iter);
        celixThreadMutex_unlock(&sub->ts_lock);
 }
 
 static void* udp_recv_thread_func(void * arg) {
-    topic_subscription_pt sub = (topic_subscription_pt) arg;
+       topic_subscription_pt sub = (topic_subscription_pt) arg;
 
 #if defined(__APPLE__) && defined(__MACH__)
     //TODO: use kqueue for OSX
@@ -558,52 +543,68 @@ static void* udp_recv_thread_func(void * arg) {
                }
     }
 #else
+       struct epoll_event events[MAX_EPOLL_EVENTS];
+
+       while (sub->running) {
+               int nfds = epoll_wait(sub->topicEpollFd, events, 
MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+               int i;
+               for(i = 0; i < nfds; i++ ) {
+                       unsigned int index;
+                       unsigned int size;
+                       if(largeUdp_dataAvailable(sub->largeUdpHandle, 
events[i].data.fd, &index, &size) == true) {
+                               // Handle data
+                               pubsub_udp_msg_t *udpMsg = NULL;
+                               if(largeUdp_read(sub->largeUdpHandle, index, 
(void**)&udpMsg, size) != 0) {
+                                       printf("PSA_UDP_MC_TS: ERROR 
largeUdp_read with index %d\n", index);
+                                       continue;
+                               }
 
-    struct epoll_event events[MAX_EPOLL_EVENTS];
+                               process_msg(sub, udpMsg);
 
-    while (sub->running) {
-        int nfds = epoll_wait(sub->topicEpollFd, events, MAX_EPOLL_EVENTS, 
RECV_THREAD_TIMEOUT * 1000);
-        int i;
-        for(i = 0; i < nfds; i++ ) {
-            unsigned int index;
-            unsigned int size;
-            if(largeUdp_dataAvailable(sub->largeUdpHandle, events[i].data.fd, 
&index, &size) == true) {
-                // Handle data
-                pubsub_udp_msg_t* udpMsg = NULL;
-                if(largeUdp_read(sub->largeUdpHandle, index, (void**)&udpMsg, 
size) != 0) {
-                       printf("TS: ERROR largeUdp_read with index %d\n", 
index);
-                       continue;
-                }
-
-                if (udpMsg->header.type == 0){
-                       //Raw msg, since raw messages are not supported, don't 
do anything.
-                }else{
-                       process_msg(sub, udpMsg);
-                }
-
-                free(udpMsg);
-            }
-        }
-    }
+                               free(udpMsg);
+                       }
+               }
+               connectPendingPublishers(sub);
+               disconnectPendingPublishers(sub);
+       }
 #endif
 
-    return NULL;
+       return NULL;
 }
 
+static void connectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingConnections_lock);
+       while(!arrayList_isEmpty(sub->pendingConnections)) {
+               char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+               pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
 
-static void sigusr1_sighandler(int signo) {
-       printf("TS: Topic subscription being shut down...\n");
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+       while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+               char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+               pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+       printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n");
        return;
 }
 
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr) {
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
        bool check=false;
        int major=0,minor=0;
 
-       if (msgVersion!=NULL) {
+       if(msgVersion!=NULL){
                version_getMajor(msgVersion,&major);
                version_getMinor(msgVersion,&minor);
-               if (hdr->major==((unsigned char)major)) { /* Different major 
means incompatible */
+               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
                        check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
                }
        }
@@ -611,24 +612,7 @@ static bool checkVersion(version_pt 
msgVersion,pubsub_msg_header_pt hdr) {
        return check;
 }
 
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* out) {
-    pubsub_msg_serializer_map_t* map = handle;
-    hash_map_iterator_t iter = hashMapIterator_construct(map->serializers);
-    unsigned int msgTypeId = 0;
-    while (hashMapIterator_hasNext(&iter)) {
-        pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter);
-        if (strncmp(msgSer->msgName, msgType, 1024 * 1024) == 0) {
-            msgTypeId = msgSer->msgId;
-            break;
-        }
-    }
-
-    if (msgTypeId == 0) {
-        printf("Cannot find msg type id for msgType %s\n", msgType);
-        return -1;
-    } else {
-        *out = msgTypeId;
-        return 0;
-    }
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt 
b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index 49eba87..8c3c727 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -53,10 +53,11 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/log_helper.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
+          
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
        )
 
        set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq 
PROPERTIES INSTALL_RPATH "$ORIGIN")
-       target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq 
celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} 
${JANSSON_LIBRARIES} ${OPENSSL_CRYPTO_LIBRARY})
+       target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq 
celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} 
${OPENSSL_CRYPTO_LIBRARY})
        install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
 
 endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
index 7e7ac42..3a39a93 100644
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
+++ b/pubsub/pubsub_admin_zmq/private/include/pubsub_admin_impl.h
@@ -24,8 +24,8 @@
  *  \copyright Apache License, Version 2.0
  */
 
-#ifndef PUBSUB_ADMIN_IMPL_H_
-#define PUBSUB_ADMIN_IMPL_H_
+#ifndef PUBSUB_ADMIN_ZMQ_IMPL_H_
+#define PUBSUB_ADMIN_ZMQ_IMPL_H_
 
 #include <czmq.h>
 /* The following undefs prevent the collision between:
@@ -38,7 +38,7 @@
 #undef LOG_WARNING
 
 #include "pubsub_admin.h"
-#include "pubsub_serializer.h"
+#include "pubsub_admin_match.h"
 #include "log_helper.h"
 
 #define PSA_ZMQ_BASE_PORT "PSA_ZMQ_BASE_PORT"
@@ -47,13 +47,17 @@
 #define PSA_ZMQ_DEFAULT_BASE_PORT 5501
 #define PSA_ZMQ_DEFAULT_MAX_PORT 6000
 
-struct pubsub_admin {
+#define PUBSUB_ADMIN_TYPE      "zmq"
 
-       pubsub_serializer_service_t* serializerSvc;
+struct pubsub_admin {
 
        bundle_context_pt bundle_context;
        log_helper_pt loghelper;
 
+       /* List of the available serializers */
+       celix_thread_mutex_t serializerListLock; // List<serializers>
+       array_list_pt serializerList;
+
        celix_thread_mutex_t localPublicationsLock;
        hash_map_pt localPublications;//<topic(string),service_factory_pt>
 
@@ -64,9 +68,17 @@ struct pubsub_admin {
        hash_map_pt subscriptions; //<topic(string),topic_subscription>
 
        celix_thread_mutex_t pendingSubscriptionsLock;
-       celix_thread_mutexattr_t pendingSubscriptionsAttr;
        hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
 
+       /* Those are used to keep track of valid subscriptions/publications 
that still have no valid serializer */
+       celix_thread_mutex_t noSerializerPendingsLock;
+       array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+       array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+       celix_thread_mutex_t usedSerializersLock;
+       hash_map_pt topicSubscriptionsPerSerializer; // 
<serializer,List<topicSubscription>>
+       hash_map_pt topicPublicationsPerSerializer; // 
<serializer,List<topicPublications>>
+
        char* ipAddress;
 
        zactor_t* zmq_auth;
@@ -75,11 +87,6 @@ struct pubsub_admin {
     unsigned int maxPort;
 };
 
-/* Note: correct locking order is
- * 1. subscriptionsLock
- * 2. publications locks
- */
-
 celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin);
 celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
 
@@ -92,10 +99,9 @@ celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoi
 celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* 
scope, char* topic);
 celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic);
 
-celix_status_t pubsubAdmin_matchPublisher(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP, double* score);
-celix_status_t pubsubAdmin_matchSubscriber(pubsub_admin_pt admin, 
pubsub_endpoint_pt subEP, double* score);
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service);
 
-celix_status_t pubsubAdmin_setSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsubAdmin_removeSerializer(pubsub_admin_pt admin, 
pubsub_serializer_service_t* serializerSvc);
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
 
-#endif /* PUBSUB_ADMIN_IMPL_H_ */
+#endif /* PUBSUB_ADMIN_ZMQ_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
----------------------------------------------------------------------
diff --git 
a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h 
b/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
deleted file mode 100644
index dbd2ff1..0000000
--- a/pubsub/pubsub_admin_zmq/private/include/pubsub_publish_service_private.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_publish_service_private.h
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-#define PUBSUB_PUBLISH_SERVICE_PRIVATE_H_
-
-#include "publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-#include "pubsub_serializer.h"
-
-typedef struct topic_publication *topic_publication_pt;
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, char* bindIP, unsigned int basePort, 
unsigned int maxPort, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
-
-celix_status_t pubsub_topicPublicationSetSerializer(topic_publication_pt pub, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicPublicationRemoveSerializer(topic_publication_pt 
pub, pubsub_serializer_service_t* serializerSvc);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
-
-#endif /* PUBSUB_PUBLISH_SERVICE_PRIVATE_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_publication.h 
b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
new file mode 100644
index 0000000..3457263
--- /dev/null
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_publication.h
@@ -0,0 +1,49 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+typedef struct topic_publication *topic_publication_pt;
+
+celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context,pubsub_endpoint_pt pubEP, pubsub_serializer_service_t 
*best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h 
b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
index c1e78c3..7267103 100644
--- a/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
+++ b/pubsub/pubsub_admin_zmq/private/include/topic_subscription.h
@@ -38,22 +38,21 @@
 
 typedef struct topic_subscription* topic_subscription_pt;
 
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt subEP, pubsub_serializer_service_t* 
serializer, char* scope, char* topic,topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context,char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out);
 celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
 celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
 
 celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL);
 celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL);
+
 celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
 celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
 
 celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
 celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
 
-celix_status_t pubsub_topicSubscriptionSetSerializer(topic_subscription_pt ts, 
pubsub_serializer_service_t* serializerSvc);
-celix_status_t pubsub_topicSubscriptionRemoveSerializer(topic_subscription_pt 
ts, pubsub_serializer_service_t* serializerSvc);
-
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
 celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
 celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);
 unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);

http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c 
b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
index cfe2c2e..fd07310 100644
--- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
@@ -28,6 +28,7 @@
 
 #include "bundle_activator.h"
 #include "service_registration.h"
+#include "service_tracker.h"
 
 #include "pubsub_admin_impl.h"
 
@@ -36,6 +37,7 @@ struct activator {
        pubsub_admin_pt admin;
        pubsub_admin_service_pt adminService;
        service_registration_pt registration;
+       service_tracker_pt serializerTracker;
 };
 
 celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
@@ -48,7 +50,28 @@ celix_status_t bundleActivator_create(bundle_context_pt 
context, void **userData
        }
        else{
                *userData = activator;
+
                status = pubsubAdmin_create(context, &(activator->admin));
+
+               if(status == CELIX_SUCCESS){
+                       service_tracker_customizer_pt customizer = NULL;
+                       status = 
serviceTrackerCustomizer_create(activator->admin,
+                                       NULL,
+                                       pubsubAdmin_serializerAdded,
+                                       NULL,
+                                       pubsubAdmin_serializerRemoved,
+                                       &customizer);
+                       if(status == CELIX_SUCCESS){
+                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+                               if(status != CELIX_SUCCESS){
+                                       
serviceTrackerCustomizer_destroy(customizer);
+                                       pubsubAdmin_destroy(activator->admin);
+                               }
+                       }
+                       else{
+                               pubsubAdmin_destroy(activator->admin);
+                       }
+               }
        }
 
        return status;
@@ -74,16 +97,14 @@ celix_status_t bundleActivator_start(void * userData, 
bundle_context_pt context)
                pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
                pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
 
-               pubsubAdminSvc->matchPublisher = pubsubAdmin_matchPublisher;
-               pubsubAdminSvc->matchSubscriber = pubsubAdmin_matchSubscriber;
-
-               pubsubAdminSvc->setSerializer = pubsubAdmin_setSerializer;
-               pubsubAdminSvc->removeSerializer = pubsubAdmin_removeSerializer;
+               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
 
                activator->adminService = pubsubAdminSvc;
 
                status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
 
+               status += serviceTracker_open(activator->serializerTracker);
+
        }
 
 
@@ -94,7 +115,9 @@ celix_status_t bundleActivator_stop(void * userData, 
bundle_context_pt context)
        celix_status_t status = CELIX_SUCCESS;
        struct activator *activator = userData;
 
-       serviceRegistration_unregister(activator->registration);
+       status += serviceTracker_close(activator->serializerTracker);
+       status += serviceRegistration_unregister(activator->registration);
+
        activator->registration = NULL;
 
        free(activator->adminService);
@@ -107,6 +130,7 @@ celix_status_t bundleActivator_destroy(void * userData, 
bundle_context_pt contex
        celix_status_t status = CELIX_SUCCESS;
        struct activator *activator = userData;
 
+       serviceTracker_destroy(activator->serializerTracker);
        pubsubAdmin_destroy(activator->admin);
        activator->admin = NULL;
 

Reply via email to