http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
new file mode 100644
index 0000000..6d0768b
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
@@ -0,0 +1,635 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.c
+ *
+ *  \date       Oct 2, 2015
+ *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "utils.h"
+#include "celix_errno.h"
+#include "constants.h"
+#include "version.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub/subscriber.h"
+#include "pubsub/publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+
+#define MAX_EPOLL_EVENTS        10
+#define RECV_THREAD_TIMEOUT     5
+#define UDP_BUFFER_SIZE         65535
+#define MAX_UDP_SESSIONS        16
+
+struct topic_subscription{
+       char* ifIpAddress;
+       service_tracker_pt tracker;
+       array_list_pt sub_ep_list;
+       celix_thread_t recv_thread;
+       bool running;
+       celix_thread_mutex_t ts_lock;
+       bundle_context_pt context;
+
+       pubsub_serializer_service_t *serializer;
+
+       int topicEpollFd; // EPOLL filedescriptor where the sockets are 
registered.
+       hash_map_pt servicesMap; // key = service, value = msg types map
+       hash_map_pt socketMap; // key = URL, value = listen-socket
+       celix_thread_mutex_t socketMap_lock;
+
+       celix_thread_mutex_t pendingConnections_lock;
+       array_list_pt pendingConnections;
+
+       array_list_pt pendingDisconnections;
+       celix_thread_mutex_t pendingDisconnections_lock;
+
+       //array_list_pt rawServices;
+       unsigned int nrSubscribers;
+       largeUdp_pt largeUdpHandle;
+};
+
+typedef struct msg_map_entry{
+       bool retain;
+       void* msgInst;
+}* msg_map_entry_pt;
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service);
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service);
+static void* udp_recv_thread_func(void* arg);
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static void sigusr1_sighandler(int signo);
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
+
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
+       celix_status_t status = CELIX_SUCCESS;
+
+       topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
+       ts->context = bundle_context;
+       ts->ifIpAddress = strdup(ifIp);
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: Use kqueue for OSX
+#else
+       ts->topicEpollFd = epoll_create1(0);
+#endif
+       if(ts->topicEpollFd == -1) {
+               status += CELIX_SERVICE_EXCEPTION;
+       }
+
+       ts->running = false;
+       ts->nrSubscribers = 0;
+       ts->serializer = best_serializer;
+
+       celixThreadMutex_create(&ts->ts_lock,NULL);
+       arrayList_create(&ts->sub_ep_list);
+       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->socketMap =  hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+
+       arrayList_create(&ts->pendingConnections);
+       arrayList_create(&ts->pendingDisconnections);
+       celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+       celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+       celixThreadMutex_create(&ts->socketMap_lock, NULL);
+
+       ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
+
+       char filter[128];
+       memset(filter,0,128);
+       if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, 
strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
+               // default scope, means that subscriber has not defined a scope 
property
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+       } else {
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic,
+                               PUBSUB_SUBSCRIBER_SCOPE,scope);
+       }
+
+       service_tracker_customizer_pt customizer = NULL;
+       status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
+       status += serviceTracker_createWithFilter(bundle_context, filter, 
customizer, &ts->tracker);
+
+       struct sigaction actions;
+       memset(&actions, 0, sizeof(actions));
+       sigemptyset(&actions.sa_mask);
+       actions.sa_flags = 0;
+       actions.sa_handler = sigusr1_sighandler;
+
+       sigaction(SIGUSR1,&actions,NULL);
+
+       if (status == CELIX_SUCCESS) {
+               *out=ts;
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->running = false;
+       free(ts->ifIpAddress);
+       serviceTracker_destroy(ts->tracker);
+       arrayList_clear(ts->sub_ep_list);
+       arrayList_destroy(ts->sub_ep_list);
+       hashMap_destroy(ts->servicesMap,false,false);
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+       hashMap_destroy(ts->socketMap,true,true);
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+       celixThreadMutex_destroy(&ts->socketMap_lock);
+
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_destroy(ts->pendingConnections);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_destroy(ts->pendingDisconnections);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+
+       largeUdp_destroy(ts->largeUdpHandle);
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: Use kqueue for OSX
+#else
+       close(ts->topicEpollFd);
+#endif
+
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       celixThreadMutex_destroy(&ts->ts_lock);
+
+       free(ts);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       status = serviceTracker_open(ts->tracker);
+
+       ts->running = true;
+
+       if(status==CELIX_SUCCESS){
+               
status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts);
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+       struct epoll_event ev;
+       memset(&ev, 0, sizeof(ev));
+
+       ts->running = false;
+
+       pthread_kill(ts->recv_thread.thread,SIGUSR1);
+
+       celixThread_join(ts->recv_thread,NULL);
+
+       status = serviceTracker_close(ts->tracker);
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+       hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
+       while(hashMapIterator_hasNext(it)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
+               char *url = hashMapEntry_getKey(entry);
+               int *s = hashMapEntry_getValue(entry);
+               memset(&ev, 0, sizeof(ev));
+               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+                       printf("in if error()\n");
+                       perror("epoll_ctl() EPOLL_CTL_DEL");
+                       status += CELIX_SERVICE_EXCEPTION;
+               }
+               free(s);
+               free(url);
+               //hashMapIterator_remove(it);
+       }
+       hashMapIterator_destroy(it);
+       hashMap_clear(ts->socketMap, false, false);
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL) {
+
+       printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", 
pubURL);
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+
+       if(!hashMap_containsKey(ts->socketMap, pubURL)){
+
+               int *recvSocket = calloc(sizeof(int), 1);
+               *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+               if (*recvSocket < 0) {
+                       perror("pubsub_topicSubscriptionCreate:socket");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+
+               if (status == CELIX_SUCCESS){
+                       int reuse = 1;
+                       if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, 
(char*) &reuse, sizeof(reuse)) != 0) {
+                               perror("setsockopt() SO_REUSEADDR");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+               }
+
+               if(status == CELIX_SUCCESS){
+                       // TODO Check if there is a better way to parse the URL 
to IP/Portnr
+                       //replace ':' by spaces
+                       char *url = strdup(pubURL);
+                       char *pt = url;
+                       while((pt=strchr(pt, ':')) != NULL) {
+                               *pt = ' ';
+                       }
+                       char mcIp[100];
+                       unsigned short mcPort;
+                       sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+                       free(url);
+
+                       printf("pubsub_topicSubscriptionConnectPublisher : IP = 
%s, Port = %hu\n", mcIp, mcPort);
+
+                       struct ip_mreq mc_addr;
+                       mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+                       mc_addr.imr_interface.s_addr = 
inet_addr(ts->ifIpAddress);
+                       printf("Adding MC %s at interface %s\n", mcIp, 
ts->ifIpAddress);
+                       if (setsockopt(*recvSocket, IPPROTO_IP, 
IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+                               perror("setsockopt() IP_ADD_MEMBERSHIP");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+
+                       if (status == CELIX_SUCCESS){
+                               struct sockaddr_in mcListenAddr;
+                               mcListenAddr.sin_family = AF_INET;
+                               mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+                               mcListenAddr.sin_port = htons(mcPort);
+                               if(bind(*recvSocket, (struct 
sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+                                       perror("bind()");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
+                       }
+
+                       if (status == CELIX_SUCCESS){
+#if defined(__APPLE__) && defined(__MACH__)
+                               //TODO: Use kqueue for OSX
+#else
+                               struct epoll_event ev;
+                               memset(&ev, 0, sizeof(ev));
+                               ev.events = EPOLLIN;
+                               ev.data.fd = *recvSocket;
+                               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, 
*recvSocket, &ev) == -1) {
+                                       perror("epoll_ctl() EPOLL_CTL_ADD");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
+#endif
+                       }
+
+               }
+
+               if (status == CELIX_SUCCESS){
+                       hashMap_put(ts->socketMap, strdup(pubURL), 
(void*)recvSocket);
+               }
+               else{
+                       free(recvSocket);
+               }
+       }
+
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_add(ts->pendingConnections, url);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_add(ts->pendingDisconnections, url);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
+       printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", 
pubURL);
+       celix_status_t status = CELIX_SUCCESS;
+       struct epoll_event ev;
+       memset(&ev, 0, sizeof(ev));
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+
+       if (hashMap_containsKey(ts->socketMap, pubURL)){
+
+#if defined(__APPLE__) && defined(__MACH__)
+               //TODO: Use kqueue for OSX
+#else
+               int *s = hashMap_remove(ts->socketMap, pubURL);
+               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+                       printf("in if error()\n");
+                       perror("epoll_ctl() EPOLL_CTL_DEL");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+               free(s);
+#endif
+
+       }
+
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_add(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+
+}
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers++;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_removeElement(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers--;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
+       return ts->nrSubscribers;
+}
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
+       return sub->sub_ep_list;
+}
+
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (!hashMap_containsKey(ts->servicesMap, service)) {
+               bundle_pt bundle = NULL;
+               hash_map_pt msgTypes = NULL;
+
+               serviceReference_getBundle(reference, &bundle);
+
+               if(ts->serializer != NULL && bundle!=NULL){
+                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+                       if(msgTypes != NULL){
+                               hashMap_put(ts->servicesMap, service, msgTypes);
+                               printf("PSA_UDP_MC_TS: New subscriber 
registered.\n");
+                       }
+               }
+               else{
+                       printf("PSA_UDP_MC_TS: Cannot register new 
subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+
+}
+
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (hashMap_containsKey(ts->servicesMap, service)) {
+               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+               if(msgTypes!=NULL && ts->serializer!=NULL){
+                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       printf("PSA_UDP_MC_TS: Subscriber unregistered.\n");
+       return status;
+}
+
+
+static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){
+
+       celixThreadMutex_lock(&sub->ts_lock);
+       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+       while (hashMapIterator_hasNext(iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
+               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+
+               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type);
+               if (msgSer == NULL) {
+                       printf("PSA_UDP_MC_TS: Serializer not available for 
message %d.\n",msg->header.type);
+               }
+               else{
+                       void *msgInst = NULL;
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,&msg->header);
+
+                       if(validVersion){
+
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
+                               if (status == CELIX_SUCCESS) {
+                                       bool release = true;
+                                       pubsub_multipart_callbacks_t 
mp_callbacks;
+                                       mp_callbacks.handle = sub;
+                                       mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
+                                       mp_callbacks.getMultipart = NULL;
+
+                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+
+                                       if(release){
+                                               msgSer->freeMsg(msgSer,msgInst);
+                                       }
+                               }
+                               else{
+                                       printf("PSA_UDP_MC_TS: Cannot 
deserialize msgType %s.\n",msgSer->msgName);
+                               }
+
+                       }
+                       else{
+                               int major=0,minor=0;
+                               version_getMajor(msgSer->msgVersion,&major);
+                               version_getMinor(msgSer->msgVersion,&minor);
+                               printf("PSA_UDP_MC_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
+                                               
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
+                       }
+
+               }
+       }
+       hashMapIterator_destroy(iter);
+       celixThreadMutex_unlock(&sub->ts_lock);
+}
+
+static void* udp_recv_thread_func(void * arg) {
+       topic_subscription_pt sub = (topic_subscription_pt) arg;
+
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: use kqueue for OSX
+       //struct kevent events[MAX_EPOLL_EVENTS];
+       while (sub->running) {
+               int nfds = 0;
+               if(nfds > 0) {
+                       pubsub_udp_msg_t* udpMsg = NULL;
+                       process_msg(sub, udpMsg);
+               }
+       }
+#else
+       struct epoll_event events[MAX_EPOLL_EVENTS];
+
+       while (sub->running) {
+               int nfds = epoll_wait(sub->topicEpollFd, events, 
MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+               int i;
+               for(i = 0; i < nfds; i++ ) {
+                       unsigned int index;
+                       unsigned int size;
+                       if(largeUdp_dataAvailable(sub->largeUdpHandle, 
events[i].data.fd, &index, &size) == true) {
+                               // Handle data
+                               pubsub_udp_msg_t *udpMsg = NULL;
+                               if(largeUdp_read(sub->largeUdpHandle, index, 
(void**)&udpMsg, size) != 0) {
+                                       printf("PSA_UDP_MC_TS: ERROR 
largeUdp_read with index %d\n", index);
+                                       continue;
+                               }
+
+                               process_msg(sub, udpMsg);
+
+                               free(udpMsg);
+                       }
+               }
+               connectPendingPublishers(sub);
+               disconnectPendingPublishers(sub);
+       }
+#endif
+
+       return NULL;
+}
+
+static void connectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingConnections_lock);
+       while(!arrayList_isEmpty(sub->pendingConnections)) {
+               char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+               pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
+
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+       while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+               char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+               pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+       printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n");
+       return;
+}
+
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+       bool check=false;
+       int major=0,minor=0;
+
+       if(msgVersion!=NULL){
+               version_getMajor(msgVersion,&major);
+               version_getMinor(msgVersion,&minor);
+               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
+                       check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
+               }
+       }
+
+       return check;
+}
+
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
new file mode 100644
index 0000000..475416a
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
@@ -0,0 +1,60 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+#include "pubsub_serializer.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
new file mode 100644
index 0000000..ddc17eb
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if (BUILD_PUBSUB_PSA_ZMQ)
+
+       find_package(ZMQ REQUIRED)
+       find_package(CZMQ REQUIRED)
+       find_package(Jansson REQUIRED)
+
+       if (BUILD_ZMQ_SECURITY)
+               add_definitions(-DBUILD_WITH_ZMQ_SECURITY=1)
+
+               find_package(OpenSSL 1.1.0 REQUIRED)
+               include_directories("${OPENSSL_INCLUDE_DIR}")
+
+               set (ZMQ_CRYPTO_C "private/src/zmq_crypto.c")
+       endif()
+
+       add_celix_bundle(celix_pubsub_admin_zmq
+                       BUNDLE_SYMBOLICNAME "apache_celix_pubsub_admin_zmq"
+                       VERSION "1.0.0"
+                       SOURCES
+                       src/psa_activator.c
+                       src/pubsub_admin_impl.c
+                       src/topic_subscription.c
+                       src/topic_publication.c
+                       ${ZMQ_CRYPTO_C}
+       )
+
+       set_target_properties(celix_pubsub_admin_zmq PROPERTIES INSTALL_RPATH 
"$ORIGIN")
+       target_link_libraries(celix_pubsub_admin_zmq PRIVATE
+                       Celix::pubsub_spi
+                       Celix::framework Celix::dfi Celix::log_helper
+                       ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} 
${OPENSSL_CRYPTO_LIBRARY}
+       )
+       target_include_directories(celix_pubsub_admin_zmq PRIVATE
+               ${ZMQ_INCLUDE_DIR}
+               ${CZMQ_INCLUDE_DIR}
+               ${JANSSON_INCLUDE_DIR}
+               src
+       )
+
+       install_celix_bundle(celix_pubsub_admin_zmq EXPORT celix COMPONENT 
pubsub)
+       target_link_libraries(celix_pubsub_admin_zmq PRIVATE Celix::shell_api)
+       add_library(Celix::pubsub_admin_zmq ALIAS celix_pubsub_admin_zmq)
+endif()

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c 
b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
new file mode 100644
index 0000000..008dff5
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/psa_activator.c
@@ -0,0 +1,186 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_tracker.h"
+
+#include "pubsub_admin_impl.h"
+
+
+struct activator {
+       pubsub_admin_pt admin;
+       pubsub_admin_service_pt adminService;
+       service_registration_pt registration;
+       service_tracker_pt serializerTracker;
+};
+
+
+static celix_status_t shellCommand(void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
+    struct activator *act= (struct activator*)handle;
+    if (act->admin->externalPublications && 
!hashMap_isEmpty(act->admin->externalPublications)) {
+        fprintf(outStream, "External Publications:\n");
+        for(hash_map_iterator_t iter = 
hashMapIterator_construct(act->admin->externalPublications); 
hashMapIterator_hasNext(&iter);) {
+            const char* key = (const char*)hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->localPublications && 
!hashMap_isEmpty(act->admin->localPublications)) {
+        fprintf(outStream, "Local Publications:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->localPublications); 
hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->subscriptions && 
!hashMap_isEmpty(act->admin->subscriptions)) {
+        fprintf(outStream, "Active Subscriptions:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->subscriptions); hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    if (act->admin->pendingSubscriptions && 
!hashMap_isEmpty(act->admin->pendingSubscriptions)) {
+        fprintf(outStream, "Pending Subscriptions:\n");
+        for (hash_map_iterator_t iter = hashMapIterator_construct(
+                act->admin->pendingSubscriptions); 
hashMapIterator_hasNext(&iter);) {
+            const char *key = (const char *) hashMapIterator_nextKey(&iter);
+            fprintf(outStream, "    %s\n", key);
+        }
+    }
+    return CELIX_SUCCESS;
+}
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator;
+
+       activator = calloc(1, sizeof(*activator));
+       if (!activator) {
+               status = CELIX_ENOMEM;
+       }
+       else{
+               *userData = activator;
+
+               status = pubsubAdmin_create(context, &(activator->admin));
+
+               if(status == CELIX_SUCCESS){
+                       service_tracker_customizer_pt customizer = NULL;
+                       status = 
serviceTrackerCustomizer_create(activator->admin,
+                                       NULL,
+                                       pubsubAdmin_serializerAdded,
+                                       NULL,
+                                       pubsubAdmin_serializerRemoved,
+                                       &customizer);
+                       if(status == CELIX_SUCCESS){
+                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+                if (status == CELIX_SUCCESS) {
+                    properties_pt shellProps = properties_create();
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, 
"psa_zmq_info");
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, 
"psa_zmq_info");
+                    properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, 
"psa_zmq_info: Overview of PubSub ZMQ Admin");
+                    activator->admin->shellCmdService.handle = activator;
+                    activator->admin->shellCmdService.executeCommand = 
shellCommand;
+                    bundleContext_registerService(context, 
OSGI_SHELL_COMMAND_SERVICE_NAME, &activator->admin->shellCmdService, 
shellProps, &activator->admin->shellCmdReg);
+                } else {
+                                       
serviceTrackerCustomizer_destroy(customizer);
+                                       pubsubAdmin_destroy(activator->admin);
+                               }
+                       }
+                       else{
+                               pubsubAdmin_destroy(activator->admin);
+                       }
+               }
+       }
+       return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+       pubsub_admin_service_pt pubsubAdminSvc = calloc(1, 
sizeof(*pubsubAdminSvc));
+
+       if (!pubsubAdminSvc) {
+               status = CELIX_ENOMEM;
+       }
+       else{
+               pubsubAdminSvc->admin = activator->admin;
+
+               pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+               pubsubAdminSvc->removePublication = 
pubsubAdmin_removePublication;
+
+               pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+               pubsubAdminSvc->removeSubscription = 
pubsubAdmin_removeSubscription;
+
+               pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
+               pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
+
+               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+
+               activator->adminService = pubsubAdminSvc;
+
+               status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+               status += serviceTracker_open(activator->serializerTracker);
+
+       }
+
+
+       return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       status += serviceTracker_close(activator->serializerTracker);
+       serviceRegistration_unregister(activator->admin->shellCmdReg);
+       activator->admin->shellCmdReg = NULL;
+       status += serviceRegistration_unregister(activator->registration);
+
+       activator->registration = NULL;
+
+       free(activator->adminService);
+       activator->adminService = NULL;
+
+       return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       serviceTracker_destroy(activator->serializerTracker);
+       pubsubAdmin_destroy(activator->admin);
+       activator->admin = NULL;
+       free(activator);
+
+       return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/3bce889b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c 
b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
new file mode 100644
index 0000000..e9bb6c3
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@ -0,0 +1,1183 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include "pubsub_admin_impl.h"
+#include <zmq.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#ifndef ANDROID
+#include <ifaddrs.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "celix_threads.h"
+#include "service_factory.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_utils.h"
+#include "pubsub/subscriber.h"
+
+#define MAX_KEY_FOLDER_PATH_LENGTH 512
+
+static const char *DEFAULT_IP = "127.0.0.1";
+
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char 
**serTypeOut);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
+    celix_status_t status = CELIX_SUCCESS;
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    if (!zsys_has_curve()){
+        printf("PSA_ZMQ: zeromq curve unsupported\n");
+        return CELIX_SERVICE_EXCEPTION;
+    }
+#endif
+
+    *admin = calloc(1, sizeof(**admin));
+
+    if (!*admin){
+        return CELIX_ENOMEM;
+    }
+
+    const char *ip = NULL;
+    char *detectedIp = NULL;
+    (*admin)->bundle_context= context;
+    (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+    (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+    (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+    (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+    (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, 
NULL, NULL);
+    (*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, 
NULL, NULL);
+    arrayList_create(&((*admin)->noSerializerSubscriptions));
+    arrayList_create(&((*admin)->noSerializerPublications));
+    arrayList_create(&((*admin)->serializerList));
+
+    celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+    celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+    celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+    celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+    celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+    celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
+    celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
+    celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
&(*admin)->noSerializerPendingsAttr);
+
+    celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+    celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
+    celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
+
+    if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+        logHelper_start((*admin)->loghelper);
+    }
+
+    bundleContext_getProperty(context,PSA_IP , &ip);
+
+#ifndef ANDROID
+    if (ip == NULL) {
+        const char *interface = NULL;
+
+        bundleContext_getProperty(context, PSA_ITF, &interface);
+        if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) {
+            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, 
"PSA_ZMQ: Could not retrieve IP adress for interface %s", interface);
+        }
+
+        ip = detectedIp;
+    }
+#endif
+
+    if (ip != NULL) {
+        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: 
Using %s for service annunciation", ip);
+        (*admin)->ipAddress = strdup(ip);
+    }
+    else {
+        logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: 
No IP address for service annunciation set. Using %s", DEFAULT_IP);
+        (*admin)->ipAddress = strdup(DEFAULT_IP);
+    }
+
+    if (detectedIp != NULL) {
+        free(detectedIp);
+    }
+
+    const char* basePortStr = NULL;
+    const char* maxPortStr = NULL;
+    char* endptrBase = NULL;
+    char* endptrMax = NULL;
+    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, 
"PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
+    bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, 
"PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
+    (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
+    (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
+    if (*endptrBase != '\0') {
+        (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
+    }
+    if (*endptrMax != '\0') {
+        (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
+    }
+
+    // Disable Signal Handling by CZMQ
+    setenv("ZSYS_SIGHANDLER", "false", true);
+
+    const char *nrZmqThreads = NULL;
+    bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads);
+
+    if(nrZmqThreads != NULL) {
+        char *endPtr = NULL;
+        unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10);
+        if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) {
+            zsys_set_io_threads(nrThreads);
+            logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: 
Using %d threads for ZMQ", nrThreads);
+            printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads);
+        }
+    }
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    // Setup authenticator
+    zactor_t* auth = zactor_new (zauth, NULL);
+    zstr_sendx(auth, "VERBOSE", NULL);
+
+    // Load all public keys of subscribers into the application
+    // This step is done for authenticating subscribers
+    char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
+    char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
+    snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/public", keys_bundle_dir);
+    zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
+    free(keys_bundle_dir);
+
+    (*admin)->zmq_auth = auth;
+#endif
+
+
+    (*admin)->defaultScore = PSA_ZMQ_DEFAULT_SCORE;
+    (*admin)->qosSampleScore = PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE;
+    (*admin)->qosControlScore = PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE;
+
+    const char *defaultScoreStr = NULL;
+    const char *sampleScoreStr = NULL;
+    const char *controlScoreStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_DEFAULT_SCORE_KEY, 
&defaultScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, 
&sampleScoreStr);
+    bundleContext_getProperty(context, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, 
&controlScoreStr);
+
+    if (defaultScoreStr != NULL) {
+        (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
+    }
+    if (sampleScoreStr != NULL) {
+        (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
+    }
+    if (controlScoreStr != NULL) {
+        (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
+    }
+
+    (*admin)->verbose = PSA_ZMQ_DEFAULT_VERBOSE;
+    const char *verboseStr = NULL;
+    bundleContext_getProperty(context, PSA_ZMQ_VERBOSE_KEY, &verboseStr);
+    if (verboseStr != NULL) {
+        (*admin)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 
0;
+    }
+
+    if ((*admin)->verbose) {
+        printf("PSA ZMQ Using base port %u to max port %u\n", 
(*admin)->basePort, (*admin)->maxPort);
+    }
+
+    return status;
+}
+
+
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+{
+    celix_status_t status = CELIX_SUCCESS;
+
+    free(admin->ipAddress);
+
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+    hash_map_iterator_pt iter = 
hashMapIterator_create(admin->pendingSubscriptions);
+    while(hashMapIterator_hasNext(iter)){
+        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+        free((char*)hashMapEntry_getKey(entry));
+        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->pendingSubscriptions,false,false);
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    hashMap_destroy(admin->subscriptions,false,false);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    hashMap_destroy(admin->localPublications,true,false);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+    celixThreadMutex_lock(&admin->externalPublicationsLock);
+    iter = hashMapIterator_create(admin->externalPublications);
+    while(hashMapIterator_hasNext(iter)){
+        hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+        free((char*)hashMapEntry_getKey(entry));
+        arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->externalPublications,false,false);
+    celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    arrayList_destroy(admin->serializerList);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+    arrayList_destroy(admin->noSerializerSubscriptions);
+    arrayList_destroy(admin->noSerializerPublications);
+    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+    while(hashMapIterator_hasNext(iter)){
+        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+    iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+    while(hashMapIterator_hasNext(iter)){
+        arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+    }
+    hashMapIterator_destroy(iter);
+    hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+    celixThreadMutex_destroy(&admin->usedSerializersLock);
+    celixThreadMutex_destroy(&admin->serializerListLock);
+    celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+
+    celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
+    celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+
+    celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+    celixThreadMutex_destroy(&admin->subscriptionsLock);
+
+    celixThreadMutex_destroy(&admin->localPublicationsLock);
+    celixThreadMutex_destroy(&admin->externalPublicationsLock);
+
+    logHelper_stop(admin->loghelper);
+
+    logHelper_destroy(&admin->loghelper);
+
+#ifdef BUILD_WITH_ZMQ_SECURITY
+    if (admin->zmq_auth != NULL){
+        zactor_destroy(&(admin->zmq_auth));
+    }
+#endif
+
+    free(admin);
+
+    return status;
+}
+
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt any_sub = hashMap_get(admin->subscriptions, 
PUBSUB_ANY_SUB_TOPIC);
+
+    if(any_sub==NULL){
+
+        int i;
+        pubsub_serializer_service_t *best_serializer = NULL;
+        const char *serType = NULL;
+        if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
+            status = pubsub_topicSubscriptionCreate(admin->bundle_context, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
serType, &any_sub);
+        }
+        else{
+            printf("PSA_ZMQ: Cannot find a serializer for subscribing topic 
%s. Adding it to pending list.\n",
+                    properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+            arrayList_add(admin->noSerializerSubscriptions,subEP);
+            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+        }
+
+        if (status == CELIX_SUCCESS){
+
+            /* Connect all internal publishers */
+            celixThreadMutex_lock(&admin->localPublicationsLock);
+            hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
+            while(hashMapIterator_hasNext(lp_iter)){
+                service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
+                topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                if(topic_publishers!=NULL){
+                    for(i=0;i<arrayList_size(topic_publishers);i++){
+                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                        if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                    arrayList_destroy(topic_publishers);
+                }
+            }
+            hashMapIterator_destroy(lp_iter);
+            celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+            /* Connect also all external publishers */
+            celixThreadMutex_lock(&admin->externalPublicationsLock);
+            hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
+            while(hashMapIterator_hasNext(extp_iter)){
+                array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
+                if(ext_pub_list!=NULL){
+                    for(i=0;i<arrayList_size(ext_pub_list);i++){
+                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                        if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                }
+            }
+            hashMapIterator_destroy(extp_iter);
+            celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+
+            pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+
+            status += pubsub_topicSubscriptionStart(any_sub);
+
+        }
+
+        if (status == CELIX_SUCCESS){
+            
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+            connectTopicPubSubToSerializer(admin, best_serializer, any_sub, 
false);
+        }
+
+    }
+
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+}
+
+/**
+ * A subcriber service is registered and this PSA had won the match
+ * (based on qos or other meta data)
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer 
type
+ */
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if(strcmp(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME), PUBSUB_ANY_SUB_TOPIC)==0) {
+        return pubsubAdmin_addAnySubscription(admin,subEP);
+    }
+
+    /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    celixThreadMutex_lock(&admin->externalPublicationsLock);
+
+    char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+    array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+
+    if(factory==NULL && ext_pub_list==NULL){ //No (local or external) 
publishers yet for this topic
+        pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+    }
+    else{
+        int i;
+        topic_subscription_pt subscription = hashMap_get(admin->subscriptions, 
scope_topic);
+
+        if(subscription == NULL) {
+            pubsub_serializer_service_t *best_serializer = NULL;
+            const char *serType = NULL;
+            if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
+                status += 
pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC_NAME), 
best_serializer, serType, &subscription);
+            }
+            else{
+                printf("PSA_ZMQ: Cannot find a serializer for subscribing 
topic %s. Adding it to pending list.\n",
+                        properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerSubscriptions,subEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            if (status==CELIX_SUCCESS){
+
+                /* Try to connect internal publishers */
+                if(factory!=NULL){
+                    topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                    array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                    if(topic_publishers!=NULL){
+                        for(i=0;i<arrayList_size(topic_publishers);i++){
+                            pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                            if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) !=NULL){
+                                status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
+                            }
+                        }
+                        arrayList_destroy(topic_publishers);
+                    }
+
+                }
+
+                /* Look also for external publishers */
+                if(ext_pub_list!=NULL){
+                    for(i=0;i<arrayList_size(ext_pub_list);i++){
+                        pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                        if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) !=NULL){
+                            status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+                        }
+                    }
+                }
+
+                pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+                status += pubsub_topicSubscriptionStart(subscription);
+
+            }
+
+            if(status==CELIX_SUCCESS){
+
+                
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+
+                connectTopicPubSubToSerializer(admin, best_serializer, 
subscription, false);
+            }
+        }
+
+        if (status == CELIX_SUCCESS){
+            pubsub_topicIncreaseNrSubscribers(subscription);
+        }
+    }
+
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added subscription [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing subscription [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
+                properties_get(subEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(subEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+    if(sub!=NULL){
+        pubsub_topicDecreaseNrSubscribers(sub);
+        if(pubsub_topicGetNrSubscribers(sub) == 0) {
+            status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
+        }
+    }
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    if(sub==NULL){
+        /* Maybe the endpoint was pending */
+        celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+        if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){
+            status = CELIX_ILLEGAL_STATE;
+        }
+        celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+    }
+
+    free(scope_topic);
+
+
+
+    return status;
+
+}
+
+
+/**
+ * A bundle has shown interested in a publisher service and this PSA had won 
the match
+ * based on filter or embedded topic.properties (extender pattern)
+ * OR !!
+ * A remote publication has been discovered and forwarded to this call
+ * Will update the pubsub endpoint with the chosen pubsub admin and serializer 
type
+ */
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *fwUUID = NULL;
+    bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
+    if (fwUUID == NULL) {
+        printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+
+    const char *epFwUUID = properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID);
+    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
+
+    if (isOwn) {
+        //should be null, willl be set in this call
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) == 
NULL);
+        assert(properties_get(pubEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
+    } else {
+        //inverse ADMIN_TYPE_KEY and SERIALIZER_TYPE shoudl not be null
+        assert(properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY) != 
NULL);
+        assert(properties_get(pubEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY) != NULL);
+    }
+
+    if (isOwn) {
+        properties_set(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY, 
PSA_ZMQ_PUBSUB_ADMIN_TYPE);
+    }
+
+
+    char *scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    if ((strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID), fwUUID) == 0) &&
+            (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == 
NULL)) {
+
+        celixThreadMutex_lock(&admin->localPublicationsLock);
+
+        service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
+
+        if (factory == NULL) {
+            topic_publication_pt pub = NULL;
+            pubsub_serializer_service_t *best_serializer = NULL;
+            const char *serType = NULL;
+            if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
+                status = pubsub_topicPublicationCreate(admin->bundle_context, 
pubEP, best_serializer, serType, admin->ipAddress, admin->basePort, 
admin->maxPort, &pub);
+                if (isOwn) {
+                    properties_set(pubEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY, serType);
+                }
+            }
+            else {
+                printf("PSA_ZMQ: Cannot find a serializer for publishing topic 
%s. Adding it to pending list.\n",
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerPublications,pubEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            if (status == CELIX_SUCCESS) {
+                status = pubsub_topicPublicationStart(admin->bundle_context, 
pub, &factory);
+                if (status == CELIX_SUCCESS && factory != NULL) {
+                    hashMap_put(admin->localPublications, strdup(scope_topic), 
factory);
+                    connectTopicPubSubToSerializer(admin, best_serializer, 
pub, true);
+                }
+            } else {
+                printf("PSA_ZMQ: Cannot create a topicPublication for 
scope=%s, topic=%s (bundle %s).\n",
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME),
+                        properties_get(pubEP->endpoint_props, 
PUBSUB_BUNDLE_ID));
+            }
+        } else {
+            //just add the new EP to the list
+            topic_publication_pt pub = (topic_publication_pt) factory->handle;
+            pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+        }
+
+        celixThreadMutex_unlock(&admin->localPublicationsLock);
+    }
+    else{
+
+        celixThreadMutex_lock(&admin->externalPublicationsLock);
+        array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
+        if (ext_pub_list == NULL) {
+            arrayList_create(&ext_pub_list);
+            hashMap_put(admin->externalPublications, strdup(scope_topic), 
ext_pub_list);
+        }
+
+        arrayList_add(ext_pub_list, pubEP);
+
+        celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    }
+
+    /* Re-evaluate the pending subscriptions */
+    celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+    hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+    if (pendingSub != NULL) { //There were pending subscription for the just 
published topic. Let's connect them.
+        char* topic = (char*) hashMapEntry_getKey(pendingSub);
+        array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
+        int i;
+        for (i = 0; i < arrayList_size(pendingSubList); i++) {
+            pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
+            pubsubAdmin_addSubscription(admin, subEP);
+        }
+        hashMap_remove(admin->pendingSubscriptions, scope_topic);
+        arrayList_clear(pendingSubList);
+        arrayList_destroy(pendingSubList);
+        free(topic);
+    }
+
+    celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+    /* Connect the new publisher to the subscription for his topic, if there 
is any */
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
+    if (sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    /* And check also for ANY subscription */
+    topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+    if (any_sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
+        pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    free(scope_topic);
+
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Added publication [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
+                isOwn);
+    }
+
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
+    celix_status_t status = CELIX_SUCCESS;
+    int count = 0;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Removing publication [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_UUID),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+        printf("PSA_ZMQ: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
+                properties_get(pubEP->endpoint_props, PUBSUB_ADMIN_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, 
PUBSUB_SERIALIZER_TYPE_KEY),
+                properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TYPE));
+        printf("PSA_ZMQ: \t [endpoint url = %s]\n", 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
+    }
+
+    const char* fwUUID = NULL;
+
+    
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+    if(fwUUID==NULL){
+        fprintf(stderr, "ERROR PSA_ZMQ: Cannot retrieve fwUUID.\n");
+        return CELIX_INVALID_BUNDLE_CONTEXT;
+    }
+    char *scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+
+    if(strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
+
+        celixThreadMutex_lock(&admin->localPublicationsLock);
+        service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+        if(factory!=NULL){
+            topic_publication_pt pub = (topic_publication_pt)factory->handle;
+            pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
+        }
+        celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+        if(factory==NULL){
+            /* Maybe the endpoint was pending */
+            celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+            if(!arrayList_removeElement(admin->noSerializerPublications, 
pubEP)){
+                status = CELIX_ILLEGAL_STATE;
+            }
+            celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+        }
+    }
+    else{
+
+        celixThreadMutex_lock(&admin->externalPublicationsLock);
+        array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+        if(ext_pub_list!=NULL){
+            int i;
+            bool found = false;
+            for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
+                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                found = pubsubEndpoint_equals(pubEP,p);
+                if (found){
+                    arrayList_remove(ext_pub_list,i);
+                }
+            }
+            // Check if there are more publishers on the same endpoint 
(happens when 1 celix-instance with multiple bundles publish in same topic)
+            for(i=0; i<arrayList_size(ext_pub_list);i++) {
+                pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                if (strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 
0) {
+                    count++;
+                }
+            }
+
+            if(arrayList_size(ext_pub_list)==0){
+                hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
+                char* topic = (char*)hashMapEntry_getKey(entry);
+                array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
+                hashMap_remove(admin->externalPublications,topic);
+                arrayList_destroy(list);
+                free(topic);
+            }
+        }
+
+        celixThreadMutex_unlock(&admin->externalPublicationsLock);
+    }
+
+    /* Check if this publisher was connected to one of our subscribers*/
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+
+    topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+    if(sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+        
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
+    }
+
+    /* And check also for ANY subscription */
+    topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+    if(any_sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
+        
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
+    }
+
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char 
*scope, char* topic){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all publications\n");
+    }
+
+    celixThreadMutex_lock(&admin->localPublicationsLock);
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
+    if(pubsvc_entry!=NULL){
+        char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
+        service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
+        topic_publication_pt pub = (topic_publication_pt)factory->handle;
+        status += pubsub_topicPublicationStop(pub);
+        disconnectTopicPubSubFromSerializer(admin, pub, true);
+        status += pubsub_topicPublicationDestroy(pub);
+        hashMap_remove(admin->localPublications,scope_topic);
+        free(key);
+        free(factory);
+    }
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+    return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic){
+    celix_status_t status = CELIX_SUCCESS;
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: Closing all subscriptions\n");
+    }
+
+    celixThreadMutex_lock(&admin->subscriptionsLock);
+    char *scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
+    hash_map_entry_pt sub_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
+    if(sub_entry!=NULL){
+        char* topic = (char*)hashMapEntry_getKey(sub_entry);
+
+        topic_subscription_pt ts = 
(topic_subscription_pt)hashMapEntry_getValue(sub_entry);
+        status += pubsub_topicSubscriptionStop(ts);
+        disconnectTopicPubSubFromSerializer(admin, ts, false);
+        status += pubsub_topicSubscriptionDestroy(ts);
+        hashMap_remove(admin->subscriptions,scope_topic);
+        free(topic);
+
+    }
+    free(scope_topic);
+    celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+    return status;
+
+}
+
+
+#ifndef ANDROID
+static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip) {
+    celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+    struct ifaddrs *ifaddr, *ifa;
+    char host[NI_MAXHOST];
+
+    if (getifaddrs(&ifaddr) != -1)
+    {
+        for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = 
ifa->ifa_next)
+        {
+            if (ifa->ifa_addr == NULL)
+                continue;
+
+            if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, 
NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == 
AF_INET)) {
+                if (interface == NULL) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+                else if (strcmp(ifa->ifa_name, interface) == 0) {
+                    *ip = strdup(host);
+                    status = CELIX_SUCCESS;
+                }
+            }
+        }
+
+        freeifaddrs(ifaddr);
+    }
+
+    return status;
+}
+#endif
+
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+    celix_status_t status = CELIX_SUCCESS;
+    char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+    array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
+    if(pendingListPerTopic==NULL){
+        arrayList_create(&pendingListPerTopic);
+        
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
+    }
+    arrayList_add(pendingListPerTopic,subEP);
+    free(scope_topic);
+    return status;
+}
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
+    /* Assumption: serializers are all available at startup.
+     * If a new (possibly better) serializer is installed and started, already 
created topic_publications/subscriptions will not be destroyed and recreated */
+
+    celix_status_t status = CELIX_SUCCESS;
+    int i=0;
+
+    const char *serType = NULL;
+    serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+    if (serType == NULL) {
+        fprintf(stderr, "WARNING PSA ZMQ: Serializer serviceReference %p has 
no %s property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
+        return CELIX_SERVICE_EXCEPTION;
+    }
+
+    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+    celixThreadMutex_lock(&admin->serializerListLock);
+    arrayList_add(admin->serializerList, reference);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    /* Now let's re-evaluate the pending */
+    celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+    for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+        pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+        pubsub_serializer_service_t *best_serializer = NULL;
+        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+            pubsubAdmin_addSubscription(admin, ep);
+        }
+    }
+
+    for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+        pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+        pubsub_serializer_service_t *best_serializer = NULL;
+        pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, NULL);
+        if(best_serializer != NULL){ /* Finally we have a valid serializer! */
+            pubsubAdmin_addPublication(admin, ep);
+        }
+    }
+
+    celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer added\n", serType);
+    }
+
+    return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
+
+    pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+    int i=0, j=0;
+    const char *serType = NULL;
+
+    serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+    if (serType == NULL) {
+        printf("WARNING PSA ZMQ: Serializer serviceReference %p has no %s 
property specified\n", reference, PUBSUB_SERIALIZER_TYPE_KEY);
+        return CELIX_SERVICE_EXCEPTION;
+    }
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    /* Remove the serializer from the list */
+    arrayList_removeElement(admin->serializerList, reference);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+    array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+    array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+    /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+    if(topicPubList!=NULL){
+        for(i=0;i<arrayList_size(topicPubList);i++){
+            topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
+            /* Stop the topic publication */
+            pubsub_topicPublicationStop(topicPub);
+            /* Get the endpoints that are going to be orphan */
+            array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
+            for(j=0;j<arrayList_size(pubList);j++){
+                pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
+                /* Remove the publication */
+                pubsubAdmin_removePublication(admin, pubEP);
+                /* Reset the endpoint field, so that will be recreated from 
scratch when a new serializer will be found */
+                if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                    properties_unset(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
+                }
+                /* Add the orphan endpoint to the noSerializer pending list */
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerPublications,pubEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+            arrayList_destroy(pubList);
+
+            /* Cleanup also the localPublications hashmap*/
+            celixThreadMutex_lock(&admin->localPublicationsLock);
+            hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
+            char *key = NULL;
+            service_factory_pt factory = NULL;
+            while(hashMapIterator_hasNext(iter)){
+                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+                factory = (service_factory_pt)hashMapEntry_getValue(entry);
+                topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+                if(pub==topicPub){
+                    key = (char*)hashMapEntry_getKey(entry);
+                    break;
+                }
+            }
+            hashMapIterator_destroy(iter);
+            if(key!=NULL){
+                hashMap_remove(admin->localPublications, key);
+                free(factory);
+                free(key);
+            }
+            celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+            /* Finally destroy the topicPublication */
+            pubsub_topicPublicationDestroy(topicPub);
+        }
+        arrayList_destroy(topicPubList);
+    }
+
+    /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+    if(topicSubList!=NULL){
+        for(i=0;i<arrayList_size(topicSubList);i++){
+            topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
+            /* Stop the topic subscription */
+            pubsub_topicSubscriptionStop(topicSub);
+            /* Get the endpoints that are going to be orphan */
+            array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
+            for(j=0;j<arrayList_size(subList);j++){
+                pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
+                /* Remove the subscription */
+                pubsubAdmin_removeSubscription(admin, subEP);
+                /* Reset the endpoint field, so that will be recreated from 
scratch when a new serializer will be found */
+                if(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
+                    properties_unset(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
+                }
+                /* Add the orphan endpoint to the noSerializer pending list */
+                celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                arrayList_add(admin->noSerializerSubscriptions,subEP);
+                celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+            }
+
+            /* Cleanup also the subscriptions hashmap*/
+            celixThreadMutex_lock(&admin->subscriptionsLock);
+            hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
+            char *key = NULL;
+            while(hashMapIterator_hasNext(iter)){
+                hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+                topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
+                if(sub==topicSub){
+                    key = (char*)hashMapEntry_getKey(entry);
+                    break;
+                }
+            }
+            hashMapIterator_destroy(iter);
+            if(key!=NULL){
+                hashMap_remove(admin->subscriptions, key);
+                free(key);
+            }
+            celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+            /* Finally destroy the topicSubscription */
+            pubsub_topicSubscriptionDestroy(topicSub);
+        }
+        arrayList_destroy(topicSubList);
+    }
+
+
+    if (admin->verbose) {
+        printf("PSA_ZMQ: %s serializer removed\n", serType);
+    }
+
+    return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
+    celix_status_t status = CELIX_SUCCESS;
+
+    const char *fwUuid = NULL;
+    bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUuid);
+    if (fwUuid == NULL) {
+        return CELIX_ILLEGAL_STATE;
+    }
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    status = pubsub_admin_match(endpoint, PSA_ZMQ_PUBSUB_ADMIN_TYPE, fwUuid, 
admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, 
admin->serializerList, score);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **svcOut, const char 
**serTypeOut) {
+    celix_status_t status = CELIX_SUCCESS;
+
+    pubsub_serializer_service_t *serSvc = NULL;
+    service_reference_pt svcRef = NULL;
+
+    celixThreadMutex_lock(&admin->serializerListLock);
+    status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, &svcRef);
+    celixThreadMutex_unlock(&admin->serializerListLock);
+
+    if (svcRef != NULL) {
+        bundleContext_getService(admin->bundle_context, svcRef, 
(void**)&serSvc);
+        bundleContext_ungetService(admin->bundle_context, svcRef, NULL); 
//TODO, FIXME this should not be done this way. only unget if the service is 
not used any more
+        if (serTypeOut != NULL) {
+            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, 
serTypeOut);
+        }
+    }
+
+
+    *svcOut = serSvc;
+
+    return status;
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+    array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+    if(list==NULL){
+        arrayList_create(&list);
+        hashMap_put(map,serializer,list);
+    }
+    arrayList_add(list,topicPubSub);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
+
+    celixThreadMutex_lock(&admin->usedSerializersLock);
+
+    hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+    hash_map_iterator_pt iter = hashMapIterator_create(map);
+    while(hashMapIterator_hasNext(iter)){
+        array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter);
+        if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+            break;
+        }
+    }
+    hashMapIterator_destroy(iter);
+
+    celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+

Reply via email to