http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c 
b/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
deleted file mode 100644
index fd07310..0000000
--- a/pubsub/pubsub_admin_zmq/private/src/psa_activator.c
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * psa_activator.c
- *
- *  \date       Sep 30, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include <stdlib.h>
-
-#include "bundle_activator.h"
-#include "service_registration.h"
-#include "service_tracker.h"
-
-#include "pubsub_admin_impl.h"
-
-
-struct activator {
-       pubsub_admin_pt admin;
-       pubsub_admin_service_pt adminService;
-       service_registration_pt registration;
-       service_tracker_pt serializerTracker;
-};
-
-celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator;
-
-       activator = calloc(1, sizeof(*activator));
-       if (!activator) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-               *userData = activator;
-
-               status = pubsubAdmin_create(context, &(activator->admin));
-
-               if(status == CELIX_SUCCESS){
-                       service_tracker_customizer_pt customizer = NULL;
-                       status = 
serviceTrackerCustomizer_create(activator->admin,
-                                       NULL,
-                                       pubsubAdmin_serializerAdded,
-                                       NULL,
-                                       pubsubAdmin_serializerRemoved,
-                                       &customizer);
-                       if(status == CELIX_SUCCESS){
-                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
-                               if(status != CELIX_SUCCESS){
-                                       
serviceTrackerCustomizer_destroy(customizer);
-                                       pubsubAdmin_destroy(activator->admin);
-                               }
-                       }
-                       else{
-                               pubsubAdmin_destroy(activator->admin);
-                       }
-               }
-       }
-
-       return status;
-}
-
-celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-       pubsub_admin_service_pt pubsubAdminSvc = calloc(1, 
sizeof(*pubsubAdminSvc));
-
-       if (!pubsubAdminSvc) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-               pubsubAdminSvc->admin = activator->admin;
-
-               pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
-               pubsubAdminSvc->removePublication = 
pubsubAdmin_removePublication;
-
-               pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
-               pubsubAdminSvc->removeSubscription = 
pubsubAdmin_removeSubscription;
-
-               pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
-               pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
-
-               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
-
-               activator->adminService = pubsubAdminSvc;
-
-               status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
-
-               status += serviceTracker_open(activator->serializerTracker);
-
-       }
-
-
-       return status;
-}
-
-celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       status += serviceTracker_close(activator->serializerTracker);
-       status += serviceRegistration_unregister(activator->registration);
-
-       activator->registration = NULL;
-
-       free(activator->adminService);
-       activator->adminService = NULL;
-
-       return status;
-}
-
-celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
-       celix_status_t status = CELIX_SUCCESS;
-       struct activator *activator = userData;
-
-       serviceTracker_destroy(activator->serializerTracker);
-       pubsubAdmin_destroy(activator->admin);
-       activator->admin = NULL;
-
-       free(activator);
-
-       return status;
-}
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
deleted file mode 100644
index 29ead0c..0000000
--- a/pubsub/pubsub_admin_zmq/private/src/pubsub_admin_impl.c
+++ /dev/null
@@ -1,1040 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.c
- *
- *  \date       Sep 30, 2011
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include "pubsub_admin_impl.h"
-#include <zmq.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#include <arpa/inet.h>
-#include <sys/socket.h>
-#include <netdb.h>
-
-#ifndef ANDROID
-#include <ifaddrs.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include "constants.h"
-#include "utils.h"
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_helper.h"
-#include "log_service.h"
-#include "celix_threads.h"
-#include "service_factory.h"
-
-#include "topic_subscription.h"
-#include "topic_publication.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_utils.h"
-#include "subscriber.h"
-
-#define MAX_KEY_FOLDER_PATH_LENGTH 512
-
-static const char *DEFAULT_IP = "127.0.0.1";
-
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
-
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
-static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
-       celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       if (!zsys_has_curve()){
-               printf("PSA_ZMQ: zeromq curve unsupported\n");
-               return CELIX_SERVICE_EXCEPTION;
-       }
-#endif
-
-       *admin = calloc(1, sizeof(**admin));
-
-       if (!*admin) {
-               status = CELIX_ENOMEM;
-       }
-       else{
-
-               const char *ip = NULL;
-               char *detectedIp = NULL;
-               (*admin)->bundle_context= context;
-               (*admin)->localPublications = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
-               (*admin)->subscriptions = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
-               (*admin)->pendingSubscriptions = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-               (*admin)->externalPublications = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-               (*admin)->topicSubscriptionsPerSerializer = 
hashMap_create(NULL, NULL, NULL, NULL);
-               (*admin)->topicPublicationsPerSerializer  = 
hashMap_create(NULL, NULL, NULL, NULL);
-               arrayList_create(&((*admin)->noSerializerSubscriptions));
-               arrayList_create(&((*admin)->noSerializerPublications));
-               arrayList_create(&((*admin)->serializerList));
-
-               celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
-               celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-               celixThreadMutex_create(&(*admin)->externalPublicationsLock, 
NULL);
-               celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
-               celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
-
-               
celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
-               
celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
-               celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
&(*admin)->noSerializerPendingsAttr);
-
-               
celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
-               
celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
-               celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
-
-               if (logHelper_create(context, &(*admin)->loghelper) == 
CELIX_SUCCESS) {
-                       logHelper_start((*admin)->loghelper);
-               }
-
-               bundleContext_getProperty(context,PSA_IP , &ip);
-
-#ifndef ANDROID
-               if (ip == NULL) {
-                       const char *interface = NULL;
-
-                       bundleContext_getProperty(context, PSA_ITF, &interface);
-                       if (pubsubAdmin_getIpAdress(interface, &detectedIp) != 
CELIX_SUCCESS) {
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface 
%s", interface);
-                       }
-
-                       ip = detectedIp;
-               }
-#endif
-
-               if (ip != NULL) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
-                       (*admin)->ipAddress = strdup(ip);
-               }
-               else {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. 
Using %s", DEFAULT_IP);
-                       (*admin)->ipAddress = strdup(DEFAULT_IP);
-               }
-
-               if (detectedIp != NULL) {
-                       free(detectedIp);
-               }
-
-               const char* basePortStr = NULL;
-               const char* maxPortStr = NULL;
-               char* endptrBase = NULL;
-               char* endptrMax = NULL;
-               bundleContext_getPropertyWithDefault(context, 
PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
-               bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, 
"PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
-               (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
-               (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
-               if (*endptrBase != '\0') {
-                       (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
-               }
-               if (*endptrMax != '\0') {
-                       (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
-               }
-
-               printf("PSA Using base port %u to max port %u\n", 
(*admin)->basePort, (*admin)->maxPort);
-
-               // Disable Signal Handling by CZMQ
-               setenv("ZSYS_SIGHANDLER", "false", true);
-
-               const char *nrZmqThreads = NULL;
-               bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", 
&nrZmqThreads);
-
-               if(nrZmqThreads != NULL) {
-                       char *endPtr = NULL;
-                       unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 
10);
-                       if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads 
< 50) {
-                               zsys_set_io_threads(nrThreads);
-                               logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
-                               printf("PSA_ZMQ: Using %d threads for ZMQ\n", 
nrThreads);
-                       }
-               }
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-               // Setup authenticator
-               zactor_t* auth = zactor_new (zauth, NULL);
-               zstr_sendx(auth, "VERBOSE", NULL);
-
-               // Load all public keys of subscribers into the application
-               // This step is done for authenticating subscribers
-               char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
-               char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
-               snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/public", keys_bundle_dir);
-               zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
-               free(keys_bundle_dir);
-
-               (*admin)->zmq_auth = auth;
-#endif
-
-       }
-
-       return status;
-}
-
-
-celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
-{
-       celix_status_t status = CELIX_SUCCESS;
-
-       free(admin->ipAddress);
-
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->pendingSubscriptions);
-       while(hashMapIterator_hasNext(iter)){
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               free((char*)hashMapEntry_getKey(entry));
-               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->pendingSubscriptions,false,false);
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hashMap_destroy(admin->subscriptions,false,false);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hashMap_destroy(admin->localPublications,true,false);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       celixThreadMutex_lock(&admin->externalPublicationsLock);
-       iter = hashMapIterator_create(admin->externalPublications);
-       while(hashMapIterator_hasNext(iter)){
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               free((char*)hashMapEntry_getKey(entry));
-               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->externalPublications,false,false);
-       celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       arrayList_destroy(admin->serializerList);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-       arrayList_destroy(admin->noSerializerSubscriptions);
-       arrayList_destroy(admin->noSerializerPublications);
-       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
-       while(hashMapIterator_hasNext(iter)){
-               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
-
-       iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
-       while(hashMapIterator_hasNext(iter)){
-               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-       celixThreadMutex_destroy(&admin->usedSerializersLock);
-       celixThreadMutex_destroy(&admin->serializerListLock);
-       celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
-
-       celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
-       celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
-
-       celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
-       celixThreadMutex_destroy(&admin->subscriptionsLock);
-
-       celixThreadMutex_destroy(&admin->localPublicationsLock);
-       celixThreadMutex_destroy(&admin->externalPublicationsLock);
-
-       logHelper_stop(admin->loghelper);
-
-       logHelper_destroy(&admin->loghelper);
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       if (admin->zmq_auth != NULL){
-               zactor_destroy(&(admin->zmq_auth));
-       }
-#endif
-
-       free(admin);
-
-       return status;
-}
-
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt any_sub = 
hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-
-       if(any_sub==NULL){
-
-               int i;
-               pubsub_serializer_service_t *best_serializer = NULL;
-               if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
-                       status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
-               }
-               else{
-                       printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
-                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                       arrayList_add(admin->noSerializerSubscriptions,subEP);
-                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-               }
-
-               if (status == CELIX_SUCCESS){
-
-                       /* Connect all internal publishers */
-                       celixThreadMutex_lock(&admin->localPublicationsLock);
-                       hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
-                       while(hashMapIterator_hasNext(lp_iter)){
-                               service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
-                               topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                               array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                               if(topic_publishers!=NULL){
-                                       
for(i=0;i<arrayList_size(topic_publishers);i++){
-                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
-                                               }
-                                       }
-                                       arrayList_destroy(topic_publishers);
-                               }
-                       }
-                       hashMapIterator_destroy(lp_iter);
-                       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-                       /* Connect also all external publishers */
-                       celixThreadMutex_lock(&admin->externalPublicationsLock);
-                       hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
-                       while(hashMapIterator_hasNext(extp_iter)){
-                               array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
-                               if(ext_pub_list!=NULL){
-                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
-                                               }
-                                       }
-                               }
-                       }
-                       hashMapIterator_destroy(extp_iter);
-                       
celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-
-                       pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
-
-                       status += pubsub_topicSubscriptionStart(any_sub);
-
-               }
-
-               if (status == CELIX_SUCCESS){
-                       
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
-                       connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
-               }
-
-       }
-
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
-
-       if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
-               return pubsubAdmin_addAnySubscription(admin,subEP);
-       }
-
-       /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       celixThreadMutex_lock(&admin->externalPublicationsLock);
-
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
-
-       service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-       array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-
-       if(factory==NULL && ext_pub_list==NULL){ //No (local or external) 
publishers yet for this topic
-               pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
-       }
-       else{
-               int i;
-               topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
-
-               if(subscription == NULL) {
-                       pubsub_serializer_service_t *best_serializer = NULL;
-                       if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
-                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, 
subEP->topic, best_serializer, &subscription);
-                       }
-                       else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       if (status==CELIX_SUCCESS){
-
-                               /* Try to connect internal publishers */
-                               if(factory!=NULL){
-                                       topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                                       array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                                       if(topic_publishers!=NULL){
-                                               
for(i=0;i<arrayList_size(topic_publishers);i++){
-                                                       pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                                       if(pubEP->endpoint 
!=NULL){
-                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
-                                                       }
-                                               }
-                                               
arrayList_destroy(topic_publishers);
-                                       }
-
-                               }
-
-                               /* Look also for external publishers */
-                               if(ext_pub_list!=NULL){
-                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
-                                               }
-                                       }
-                               }
-
-                               
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
-
-                               status += 
pubsub_topicSubscriptionStart(subscription);
-
-                       }
-
-                       if(status==CELIX_SUCCESS){
-
-                               
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-
-                               connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
-                       }
-               }
-
-               if (status == CELIX_SUCCESS){
-                       pubsub_topicIncreaseNrSubscribers(subscription);
-               }
-       }
-
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
-
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL){
-               pubsub_topicDecreaseNrSubscribers(sub);
-               if(pubsub_topicGetNrSubscribers(sub) == 0) {
-                       status = 
pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
-               }
-       }
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       if(sub==NULL){
-               /* Maybe the endpoint was pending */
-               celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-               if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
-                       status = CELIX_ILLEGAL_STATE;
-               }
-               celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-       }
-
-       free(scope_topic);
-
-
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, 
pubEP->topic);
-
-       const char* fwUUID = NULL;
-
-       bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-       if (fwUUID == NULL) {
-               printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
-               return CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
-       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
-
-               celixThreadMutex_lock(&admin->localPublicationsLock);
-
-               service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
-
-               if (factory == NULL) {
-                       topic_publication_pt pub = NULL;
-                       pubsub_serializer_service_t *best_serializer = NULL;
-                       if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer)) == CELIX_SUCCESS){
-                               status = 
pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, 
admin->ipAddress, admin->basePort, admin->maxPort, &pub);
-                       }
-                       else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n", pubEP->topic);
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerPublications,pubEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       if (status == CELIX_SUCCESS) {
-                               status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
-                               if (status == CELIX_SUCCESS && factory != NULL) 
{
-                                       hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
-                                       connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
-                               }
-                       } else {
-                               printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
-                       }
-               } else {
-                       //just add the new EP to the list
-                       topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
-                       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-               }
-
-               celixThreadMutex_unlock(&admin->localPublicationsLock);
-       }
-       else{
-
-               celixThreadMutex_lock(&admin->externalPublicationsLock);
-               array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
-               if (ext_pub_list == NULL) {
-                       arrayList_create(&ext_pub_list);
-                       hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
-               }
-
-               arrayList_add(ext_pub_list, pubEP);
-
-               celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       }
-
-       /* Re-evaluate the pending subscriptions */
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
-       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
-       if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
-               char* topic = (char*) hashMapEntry_getKey(pendingSub);
-               array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
-               int i;
-               for (i = 0; i < arrayList_size(pendingSubList); i++) {
-                       pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
-                       pubsubAdmin_addSubscription(admin, subEP);
-               }
-               hashMap_remove(admin->pendingSubscriptions, scope_topic);
-               arrayList_clear(pendingSubList);
-               arrayList_destroy(pendingSubList);
-               free(topic);
-       }
-
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-       /* Connect the new publisher to the subscription for his topic, if 
there is any */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-       if (sub != NULL && pubEP->endpoint != NULL) {
-               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
-       }
-
-       /* And check also for ANY subscription */
-       topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-       if (any_sub != NULL && pubEP->endpoint != NULL) {
-               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
-       }
-
-       free(scope_topic);
-
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
-       celix_status_t status = CELIX_SUCCESS;
-       int count = 0;
-
-       printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
-
-       const char* fwUUID = NULL;
-
-       
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-       if(fwUUID==NULL){
-               printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
-               return CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
-
-       if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
-
-               celixThreadMutex_lock(&admin->localPublicationsLock);
-               service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-               if(factory!=NULL){
-                       topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-                       pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
-               }
-               celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-               if(factory==NULL){
-                       /* Maybe the endpoint was pending */
-                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                       
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
-                               status = CELIX_ILLEGAL_STATE;
-                       }
-                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-               }
-       }
-       else{
-
-               celixThreadMutex_lock(&admin->externalPublicationsLock);
-               array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-               if(ext_pub_list!=NULL){
-                       int i;
-                       bool found = false;
-                       for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
-                               pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                               found = pubsubEndpoint_equals(pubEP,p);
-                               if (found){
-                                       arrayList_remove(ext_pub_list,i);
-                               }
-                       }
-                       // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
-                       for(i=0; i<arrayList_size(ext_pub_list);i++) {
-                               pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
-                                       count++;
-                               }
-                       }
-
-                       if(arrayList_size(ext_pub_list)==0){
-                               hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
-                               char* topic = (char*)hashMapEntry_getKey(entry);
-                               array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
-                               
hashMap_remove(admin->externalPublications,topic);
-                               arrayList_destroy(list);
-                               free(topic);
-                       }
-               }
-
-               celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       }
-
-       /* Check if this publisher was connected to one of our subscribers*/
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
-       }
-
-       /* And check also for ANY subscription */
-       topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
-       }
-
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char 
*scope, char* topic){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_ZMQ: Closing all publications\n");
-
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       char *scope_topic = createScopeTopicKey(scope, topic);
-       hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
-       if(pubsvc_entry!=NULL){
-               char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
-               service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
-               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-               status += pubsub_topicPublicationStop(pub);
-               disconnectTopicPubSubFromSerializer(admin, pub, true);
-               status += pubsub_topicPublicationDestroy(pub);
-               hashMap_remove(admin->localPublications,scope_topic);
-               free(key);
-               free(factory);
-       }
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_ZMQ: Closing all subscriptions\n");
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       char *scope_topic = createScopeTopicKey(scope, topic);
-       hash_map_entry_pt sub_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
-       if(sub_entry!=NULL){
-               char* topic = (char*)hashMapEntry_getKey(sub_entry);
-
-               topic_subscription_pt ts = 
(topic_subscription_pt)hashMapEntry_getValue(sub_entry);
-               status += pubsub_topicSubscriptionStop(ts);
-               disconnectTopicPubSubFromSerializer(admin, ts, false);
-               status += pubsub_topicSubscriptionDestroy(ts);
-               hashMap_remove(admin->subscriptions,scope_topic);
-               free(topic);
-
-       }
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-
-}
-
-
-#ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip) {
-       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-       struct ifaddrs *ifaddr, *ifa;
-       char host[NI_MAXHOST];
-
-       if (getifaddrs(&ifaddr) != -1)
-       {
-               for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa 
= ifa->ifa_next)
-               {
-                       if (ifa->ifa_addr == NULL)
-                               continue;
-
-                       if ((getnameinfo(ifa->ifa_addr,sizeof(struct 
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && 
(ifa->ifa_addr->sa_family == AF_INET)) {
-                               if (interface == NULL) {
-                                       *ip = strdup(host);
-                                       status = CELIX_SUCCESS;
-                               }
-                               else if (strcmp(ifa->ifa_name, interface) == 0) 
{
-                                       *ip = strdup(host);
-                                       status = CELIX_SUCCESS;
-                               }
-                       }
-               }
-
-               freeifaddrs(ifaddr);
-       }
-
-       return status;
-}
-#endif
-
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
-       array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
-       if(pendingListPerTopic==NULL){
-               arrayList_create(&pendingListPerTopic);
-               
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
-       }
-       arrayList_add(pendingListPerTopic,subEP);
-       free(scope_topic);
-       return status;
-}
-
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
-       /* Assumption: serializers are all available at startup.
-        * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
-
-       celix_status_t status = CELIX_SUCCESS;
-       int i=0;
-
-       const char *serType = NULL;
-       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType == NULL){
-               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
-       celixThreadMutex_lock(&admin->serializerListLock);
-       arrayList_add(admin->serializerList, reference);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       /* Now let's re-evaluate the pending */
-       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-
-       for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
-               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
-               pubsub_serializer_service_t *best_serializer = NULL;
-               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
-               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
-                       pubsubAdmin_addSubscription(admin, ep);
-               }
-       }
-
-       for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
-               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
-               pubsub_serializer_service_t *best_serializer = NULL;
-               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
-               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
-                       pubsubAdmin_addPublication(admin, ep);
-               }
-       }
-
-       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-       printf("PSA_ZMQ: %s serializer added\n",serType);
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
-
-       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
-       int i=0, j=0;
-       const char *serType = NULL;
-
-       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType == NULL){
-               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       /* Remove the serializer from the list */
-       arrayList_removeElement(admin->serializerList, reference);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-       array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
-       array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-       /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
-       if(topicPubList!=NULL){
-               for(i=0;i<arrayList_size(topicPubList);i++){
-                       topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
-                       /* Stop the topic publication */
-                       pubsub_topicPublicationStop(topicPub);
-                       /* Get the endpoints that are going to be orphan */
-                       array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
-                       for(j=0;j<arrayList_size(pubList);j++){
-                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
-                               /* Remove the publication */
-                               pubsubAdmin_removePublication(admin, pubEP);
-                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(pubEP->endpoint!=NULL){
-                                       free(pubEP->endpoint);
-                                       pubEP->endpoint = NULL;
-                               }
-                               /* Add the orphan endpoint to the noSerializer 
pending list */
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerPublications,pubEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-                       arrayList_destroy(pubList);
-
-                       /* Cleanup also the localPublications hashmap*/
-                       celixThreadMutex_lock(&admin->localPublicationsLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
-                       char *key = NULL;
-                       service_factory_pt factory = NULL;
-                       while(hashMapIterator_hasNext(iter)){
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
-                               factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
-                               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-                               if(pub==topicPub){
-                                       key = (char*)hashMapEntry_getKey(entry);
-                                       break;
-                               }
-                       }
-                       hashMapIterator_destroy(iter);
-                       if(key!=NULL){
-                               hashMap_remove(admin->localPublications, key);
-                               free(factory);
-                               free(key);
-                       }
-                       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-                       /* Finally destroy the topicPublication */
-                       pubsub_topicPublicationDestroy(topicPub);
-               }
-               arrayList_destroy(topicPubList);
-       }
-
-       /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
-       if(topicSubList!=NULL){
-               for(i=0;i<arrayList_size(topicSubList);i++){
-                       topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
-                       /* Stop the topic subscription */
-                       pubsub_topicSubscriptionStop(topicSub);
-                       /* Get the endpoints that are going to be orphan */
-                       array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
-                       for(j=0;j<arrayList_size(subList);j++){
-                               pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
-                               /* Remove the subscription */
-                               pubsubAdmin_removeSubscription(admin, subEP);
-                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(subEP->endpoint!=NULL){
-                                       free(subEP->endpoint);
-                                       subEP->endpoint = NULL;
-                               }
-                               /* Add the orphan endpoint to the noSerializer 
pending list */
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       /* Cleanup also the subscriptions hashmap*/
-                       celixThreadMutex_lock(&admin->subscriptionsLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
-                       char *key = NULL;
-                       while(hashMapIterator_hasNext(iter)){
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
-                               topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
-                               if(sub==topicSub){
-                                       key = (char*)hashMapEntry_getKey(entry);
-                                       break;
-                               }
-                       }
-                       hashMapIterator_destroy(iter);
-                       if(key!=NULL){
-                               hashMap_remove(admin->subscriptions, key);
-                               free(key);
-                       }
-                       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-                       /* Finally destroy the topicSubscription */
-                       pubsub_topicSubscriptionDestroy(topicSub);
-               }
-               arrayList_destroy(topicSubList);
-       }
-
-
-
-       printf("PSA_ZMQ: %s serializer removed\n",serType);
-
-
-       return CELIX_SUCCESS;
-}
-
-celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       status = 
pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       return status;
-}
-
-/* This one recall the same logic as in the match function */
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
-
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, serSvc);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       return status;
-
-}
-
-static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-       array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
-       if(list==NULL){
-               arrayList_create(&list);
-               hashMap_put(map,serializer,list);
-       }
-       arrayList_add(list,topicPubSub);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}
-
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-       hash_map_iterator_pt iter = hashMapIterator_create(map);
-       while(hashMapIterator_hasNext(iter)){
-               array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               if(arrayList_removeElement(list, topicPubSub)){ //Found it!
-                       break;
-               }
-       }
-       hashMapIterator_destroy(iter);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
deleted file mode 100644
index e405866..0000000
--- a/pubsub/pubsub_admin_zmq/private/src/topic_publication.c
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_registration.h"
-#include "utils.h"
-#include "service_factory.h"
-#include "version.h"
-
-#include "pubsub_common.h"
-#include "pubsub_utils.h"
-#include "publisher.h"
-
-#include "topic_publication.h"
-
-#include "pubsub_serializer.h"
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       #include "zmq_crypto.h"
-
-       #define MAX_CERT_PATH_LENGTH 512
-#endif
-
-#define EP_ADDRESS_LEN         32
-#define ZMQ_BIND_MAX_RETRY     5
-
-#define FIRST_SEND_DELAY       2
-
-struct topic_publication {
-       zsock_t* zmq_socket;
-       celix_thread_mutex_t socket_lock; //Protects zmq_socket access
-       zcert_t * zmq_cert;
-       char* endpoint;
-       service_registration_pt svcFactoryReg;
-       array_list_pt pub_ep_list; //List<pubsub_endpoint>
-       hash_map_pt boundServices; //<bundle_pt,bound_service>
-       pubsub_serializer_service_t *serializer;
-       celix_thread_mutex_t tp_lock;
-};
-
-typedef struct publish_bundle_bound_service {
-       topic_publication_pt parent;
-       pubsub_publisher_t service;
-       bundle_pt bundle;
-       char *topic;
-       hash_map_pt msgTypes;
-       unsigned short getCount;
-       celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service 
data structure
-       bool mp_send_in_progress;
-       array_list_pt mp_parts;
-}* publish_bundle_bound_service_pt;
-
-/* Note: correct locking order is
- * 1. tp_lock
- * 2. mp_lock
- * 3. socket_lock
- *
- * tp_lock and socket_lock are independent.
- */
-
-typedef struct pubsub_msg{
-       pubsub_msg_header_pt header;
-       char* payload;
-       int payloadSize;
-}* pubsub_msg_pt;
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
-
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
-
-static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags);
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
-
-static void delay_first_send_for_late_joiners(void);
-
-celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){
-       celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       char* secure_topics = NULL;
-       bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char 
**) &secure_topics);
-
-       if (secure_topics){
-               array_list_pt secure_topics_list = 
pubsub_getTopicsFromString(secure_topics);
-
-               int i;
-               int secure_topics_size = arrayList_size(secure_topics_list);
-               for (i = 0; i < secure_topics_size; i++){
-                       char* top = arrayList_get(secure_topics_list, i);
-                       if (strcmp(pubEP->topic, top) == 0){
-                               printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
-                               pubEP->is_secure = true;
-                       }
-                       free(top);
-                       top = NULL;
-               }
-
-               arrayList_destroy(secure_topics_list);
-       }
-
-       zcert_t* pub_cert = NULL;
-       if (pubEP->is_secure){
-               char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-               if (keys_bundle_dir == NULL){
-                       return CELIX_SERVICE_EXCEPTION;
-               }
-
-               const char* keys_file_path = NULL;
-               const char* keys_file_name = NULL;
-               bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_PATH, &keys_file_path);
-               bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_NAME, &keys_file_name);
-
-               char cert_path[MAX_CERT_PATH_LENGTH];
-
-               //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
-               snprintf(cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, 
pubEP->topic);
-               free(keys_bundle_dir);
-               printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
-
-               pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, 
(char *) keys_file_name, cert_path);
-               if (pub_cert == NULL){
-                       printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
-                       printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", 
pubEP->topic);
-                       pubEP->is_secure = false;
-               }
-       }
-#endif
-
-       zsock_t* socket = zsock_new (ZMQ_PUB);
-       if(socket==NULL){
-               #ifdef BUILD_WITH_ZMQ_SECURITY
-                       if (pubEP->is_secure){
-                               zcert_destroy(&pub_cert);
-                       }
-               #endif
-
-        perror("Error for zmq_socket");
-               return CELIX_SERVICE_EXCEPTION;
-       }
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       if (pubEP->is_secure){
-               zcert_apply (pub_cert, socket); // apply certificate to socket
-               zsock_set_curve_server (socket, true); // setup the publisher's 
socket to use the curve functions
-       }
-#endif
-
-       int rv = -1, retry=0;
-       char* ep = malloc(EP_ADDRESS_LEN);
-    char bindAddress[EP_ADDRESS_LEN];
-
-       while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
-               /* Randomized part due to same bundle publishing on different 
topics */
-               unsigned int port = rand_range(basePort,maxPort);
-               memset(ep,0,EP_ADDRESS_LEN);
-        memset(bindAddress, 0, EP_ADDRESS_LEN);
-
-               snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); 
//NOTE using a different bind addres than endpoint address
-               rv = zsock_bind (socket, "%s", bindAddress);
-        if (rv == -1) {
-            perror("Error for zmq_bind");
-        }
-               retry++;
-       }
-
-       if(rv == -1){
-               free(ep);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       /* ZMQ stuffs are all fine at this point. Let's create and initialize 
the structure */
-
-       topic_publication_pt pub = calloc(1,sizeof(*pub));
-
-       arrayList_create(&(pub->pub_ep_list));
-       pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
-       celixThreadMutex_create(&(pub->tp_lock),NULL);
-
-       pub->endpoint = ep;
-       pub->zmq_socket = socket;
-       pub->serializer = best_serializer;
-
-       celixThreadMutex_create(&(pub->socket_lock),NULL);
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       if (pubEP->is_secure){
-               pub->zmq_cert = pub_cert;
-       }
-#endif
-
-       pubsub_topicPublicationAddPublisherEP(pub,pubEP);
-
-       *out = pub;
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-
-       free(pub->endpoint);
-       arrayList_destroy(pub->pub_ep_list);
-
-       hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
-               pubsub_destroyPublishBundleBoundService(bound);
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(pub->boundServices,false,false);
-
-       pub->svcFactoryReg = NULL;
-       pub->serializer = NULL;
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       zcert_destroy(&(pub->zmq_cert));
-#endif
-
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       celixThreadMutex_destroy(&(pub->tp_lock));
-
-       celixThreadMutex_lock(&(pub->socket_lock));
-       zsock_destroy(&(pub->zmq_socket));
-       celixThreadMutex_unlock(&(pub->socket_lock));
-
-       celixThreadMutex_destroy(&(pub->socket_lock));
-
-       free(pub);
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
-       celix_status_t status = CELIX_SUCCESS;
-
-       /* Let's register the new service */
-
-       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
-
-       if(pubEP!=NULL){
-               service_factory_pt factory = calloc(1, sizeof(*factory));
-               factory->handle = pub;
-               factory->getService = pubsub_topicPublicationGetService;
-               factory->ungetService = pubsub_topicPublicationUngetService;
-
-               properties_pt props = properties_create();
-               properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
-               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
-               properties_set(props,"service.version", 
PUBSUB_PUBLISHER_SERVICE_VERSION);
-
-               status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
-
-               if(status != CELIX_SUCCESS){
-                       properties_destroy(props);
-                       printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
-               }
-               else{
-                       *svcFactory = factory;
-               }
-       }
-       else{
-               printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after 
adding it...Should never happen!\n");
-               status = CELIX_SERVICE_EXCEPTION;
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-       return serviceRegistration_unregister(pub->svcFactoryReg);
-}
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-       ep->endpoint = strdup(pub->endpoint);
-       arrayList_add(pub->pub_ep_list,ep);
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-       for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
-               pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
-               if(pubsubEndpoint_equals(ep, e)) {
-                   arrayList_removeElement(pub->pub_ep_list,ep);
-                   break;
-               }
-       }
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
-       array_list_pt list = NULL;
-       celixThreadMutex_lock(&(pub->tp_lock));
-       list = arrayList_clone(pub->pub_ep_list);
-       celixThreadMutex_unlock(&(pub->tp_lock));
-       return list;
-}
-
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service) {
-       celix_status_t  status = CELIX_SUCCESS;
-
-       topic_publication_pt publish = (topic_publication_pt)handle;
-
-       celixThreadMutex_lock(&(publish->tp_lock));
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound==NULL){
-               bound = pubsub_createPublishBundleBoundService(publish,bundle);
-               if(bound!=NULL){
-                       hashMap_put(publish->boundServices,bundle,bound);
-               }
-       }
-       else{
-               bound->getCount++;
-       }
-
-       *service = &bound->service;
-
-       celixThreadMutex_unlock(&(publish->tp_lock));
-
-       return status;
-}
-
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service)  {
-
-       topic_publication_pt publish = (topic_publication_pt)handle;
-
-       celixThreadMutex_lock(&(publish->tp_lock));
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound!=NULL){
-
-               bound->getCount--;
-               if(bound->getCount==0){
-                       pubsub_destroyPublishBundleBoundService(bound);
-                       hashMap_remove(publish->boundServices,bundle);
-               }
-
-       }
-       else{
-               long bundleId = -1;
-               bundle_getBundleId(bundle,&bundleId);
-               printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
-       }
-
-       /* service should be never used for unget, so let's set the pointer to 
NULL */
-       *service = NULL;
-
-       celixThreadMutex_unlock(&(publish->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){
-
-       bool ret = true;
-
-       zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct 
pubsub_msg_header));
-       if (headerMsg == NULL) ret=false;
-       zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
-       if (payloadMsg == NULL) ret=false;
-
-       delay_first_send_for_late_joiners();
-
-       if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
-
-       if(!last){
-               if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) 
ret=false;
-       }
-       else{
-               if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
-       }
-
-       if (!ret){
-               zframe_destroy(&headerMsg);
-               zframe_destroy(&payloadMsg);
-       }
-
-       free(msg->header);
-       free(msg->payload);
-       free(msg);
-
-       return ret;
-
-}
-
-static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt 
mp_msg_parts){
-
-       bool ret = true;
-
-       unsigned int i = 0;
-       unsigned int mp_num = arrayList_size(mp_msg_parts);
-       for(;i<mp_num;i++){
-               ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
-       }
-       arrayList_clear(mp_msg_parts);
-
-       return ret;
-
-}
-
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
-
-       return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, 
PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
-
-}
-
-static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags){
-
-       int status = 0;
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
-
-       celixThreadMutex_lock(&(bound->parent->tp_lock));
-       celixThreadMutex_lock(&(bound->mp_lock));
-       if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & 
PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
-               printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot 
process a new one.\n");
-               celixThreadMutex_unlock(&(bound->mp_lock));
-               celixThreadMutex_unlock(&(bound->parent->tp_lock));
-               return -3;
-       }
-
-       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
-
-       if (msgSer!= NULL) {
-               int major=0, minor=0;
-
-               pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
-               strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-               msg_hdr->type = msgTypeId;
-
-               if (msgSer->msgVersion != NULL){
-                       version_getMajor(msgSer->msgVersion, &major);
-                       version_getMinor(msgSer->msgVersion, &minor);
-                       msg_hdr->major = major;
-                       msg_hdr->minor = minor;
-               }
-
-               void *serializedOutput = NULL;
-               size_t serializedOutputLen = 0;
-               msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
-
-               pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
-               msg->header = msg_hdr;
-               msg->payload = (char*)serializedOutput;
-               msg->payloadSize = serializedOutputLen;
-               bool snd = true;
-
-               switch(flags){
-               case PUBSUB_PUBLISHER_FIRST_MSG:
-                       bound->mp_send_in_progress = true;
-                       arrayList_add(bound->mp_parts,msg);
-                       break;
-               case PUBSUB_PUBLISHER_PART_MSG:
-                       if(!bound->mp_send_in_progress){
-                               printf("PSA_ZMQ_TP: ERROR: received msg part 
without the first part.\n");
-                               status = -4;
-                       }
-                       else{
-                               arrayList_add(bound->mp_parts,msg);
-                       }
-                       break;
-               case PUBSUB_PUBLISHER_LAST_MSG:
-                       if(!bound->mp_send_in_progress){
-                               printf("PSA_ZMQ_TP: ERROR: received end msg 
without the first part.\n");
-                               status = -4;
-                       }
-                       else{
-                               arrayList_add(bound->mp_parts,msg);
-                               snd = 
send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
-                               bound->mp_send_in_progress = false;
-                       }
-                       break;
-               case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:    
//Normal send case
-                       snd = 
send_pubsub_msg(bound->parent->zmq_socket,msg,true);
-                       break;
-               default:
-                       printf("PSA_ZMQ_TP: ERROR: Invalid MP flags 
combination\n");
-                       status = -4;
-                       break;
-               }
-
-               if(status==-4){
-                       free(msg);
-               }
-
-               if(!snd){
-                       printf("PSA_ZMQ_TP: Failed to send %s message 
%u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? 
"single" : "multipart", msgTypeId);
-               }
-
-       } else {
-        printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", 
msgTypeId);
-               status=-1;
-       }
-
-       celixThreadMutex_unlock(&(bound->mp_lock));
-       celixThreadMutex_unlock(&(bound->parent->tp_lock));
-
-       return status;
-
-}
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
-}
-
-
-static unsigned int rand_range(unsigned int min, unsigned int max){
-
-       double scaled = (double)(((double)random())/((double)RAND_MAX));
-       return (max-min+1)*scaled + min;
-
-}
-
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
-
-       //PRECOND lock on tp->lock
-
-       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
-
-       if (bound != NULL) {
-
-               bound->parent = tp;
-               bound->bundle = bundle;
-               bound->getCount = 1;
-               bound->mp_send_in_progress = false;
-               celixThreadMutex_create(&bound->mp_lock,NULL);
-
-               if(tp->serializer != NULL){
-                       
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
-               }
-
-               arrayList_create(&bound->mp_parts);
-
-               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-               bound->topic=strdup(pubEP->topic);
-
-               bound->service.handle = bound;
-               bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->service.send = pubsub_topicPublicationSend;
-               bound->service.sendMultipart = 
pubsub_topicPublicationSendMultipart;
-
-       }
-
-       return bound;
-}
-
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
-
-       //PRECOND lock on tp->lock
-
-       celixThreadMutex_lock(&boundSvc->mp_lock);
-
-
-       if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
-               
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
-       }
-
-       if(boundSvc->mp_parts!=NULL){
-               arrayList_destroy(boundSvc->mp_parts);
-       }
-
-       if(boundSvc->topic!=NULL){
-               free(boundSvc->topic);
-       }
-
-       celixThreadMutex_unlock(&boundSvc->mp_lock);
-       celixThreadMutex_destroy(&boundSvc->mp_lock);
-
-       free(boundSvc);
-
-}
-
-static void delay_first_send_for_late_joiners(){
-
-       static bool firstSend = true;
-
-       if(firstSend){
-               printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
-               sleep(FIRST_SEND_DELAY);
-               firstSend = false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/81804e00/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c 
b/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
deleted file mode 100644
index 0e7a794..0000000
--- a/pubsub/pubsub_admin_zmq/private/src/topic_subscription.c
+++ /dev/null
@@ -1,732 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_subscription.c
- *
- *  \date       Oct 2, 2015
- *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#include "topic_subscription.h"
-#include <czmq.h>
-/* The following undefs prevent the collision between:
- * - sys/syslog.h (which is included within czmq)
- * - celix/dfi/dfi_log_util.h
- */
-#undef LOG_DEBUG
-#undef LOG_WARNING
-#undef LOG_INFO
-#undef LOG_WARNING
-
-#include <string.h>
-#include <stdlib.h>
-#include <signal.h>
-
-#include "utils.h"
-#include "celix_errno.h"
-#include "constants.h"
-#include "version.h"
-
-#include "subscriber.h"
-#include "publisher.h"
-#include "pubsub_utils.h"
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-#include "zmq_crypto.h"
-
-#define MAX_CERT_PATH_LENGTH 512
-#endif
-
-#define POLL_TIMEOUT   250
-#define ZMQ_POLL_TIMEOUT_MS_ENV        "ZMQ_POLL_TIMEOUT_MS"
-
-struct topic_subscription{
-
-       zsock_t* zmq_socket;
-       zcert_t * zmq_cert;
-       zcert_t * zmq_pub_cert;
-       pthread_mutex_t socket_lock;
-       service_tracker_pt tracker;
-       array_list_pt sub_ep_list;
-       celix_thread_t recv_thread;
-       bool running;
-       celix_thread_mutex_t ts_lock;
-       bundle_context_pt context;
-
-       pubsub_serializer_service_t *serializer;
-
-       hash_map_pt servicesMap; // key = service, value = msg types map
-
-       celix_thread_mutex_t pendingConnections_lock;
-       array_list_pt pendingConnections;
-
-       array_list_pt pendingDisconnections;
-       celix_thread_mutex_t pendingDisconnections_lock;
-
-       unsigned int nrSubscribers;
-};
-
-typedef struct complete_zmq_msg{
-       zframe_t* header;
-       zframe_t* payload;
-}* complete_zmq_msg_pt;
-
-typedef struct mp_handle{
-       hash_map_pt svc_msg_db;
-       hash_map_pt rcv_msg_map;
-}* mp_handle_pt;
-
-typedef struct msg_map_entry{
-       bool retain;
-       void* msgInst;
-}* msg_map_entry_pt;
-
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service);
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service);
-static void* zmq_recv_thread_func(void* arg);
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
-static void sigusr1_sighandler(int signo);
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
-static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part);
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list);
-static void destroy_mp_handle(mp_handle_pt mp_handle);
-static void connectPendingPublishers(topic_subscription_pt sub);
-static void disconnectPendingPublishers(topic_subscription_pt sub);
-
-celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* scope, char* topic, pubsub_serializer_service_t 
*best_serializer, topic_subscription_pt* out){
-       celix_status_t status = CELIX_SUCCESS;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
-       if (keys_bundle_dir == NULL){
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       const char* keys_file_path = NULL;
-       const char* keys_file_name = NULL;
-       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, 
&keys_file_path);
-       bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, 
&keys_file_name);
-
-       char sub_cert_path[MAX_CERT_PATH_LENGTH];
-       char pub_cert_path[MAX_CERT_PATH_LENGTH];
-
-       //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc"
-       snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic);
-       snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic);
-       free(keys_bundle_dir);
-
-       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", 
sub_cert_path);
-       printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", 
pub_cert_path);
-
-       zcert_t* sub_cert = get_zcert_from_encoded_file((char *) 
keys_file_path, (char *) keys_file_name, sub_cert_path);
-       if (sub_cert == NULL){
-               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
sub_cert_path);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       zcert_t* pub_cert = zcert_load(pub_cert_path);
-       if (pub_cert == NULL){
-               zcert_destroy(&sub_cert);
-               printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", 
pub_cert_path);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       const char* pub_key = zcert_public_txt(pub_cert);
-#endif
-
-       zsock_t* zmq_s = zsock_new (ZMQ_SUB);
-       if(zmq_s==NULL){
-#ifdef BUILD_WITH_ZMQ_SECURITY
-               zcert_destroy(&sub_cert);
-               zcert_destroy(&pub_cert);
-#endif
-
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       zcert_apply (sub_cert, zmq_s);
-       zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to 
socket of subscriber
-#endif
-
-       if(strcmp(topic,PUBSUB_ANY_SUB_TOPIC)==0){
-               zsock_set_subscribe (zmq_s, "");
-       }
-       else{
-               zsock_set_subscribe (zmq_s, topic);
-       }
-
-       topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
-       ts->context = bundle_context;
-       ts->zmq_socket = zmq_s;
-       ts->running = false;
-       ts->nrSubscribers = 0;
-       ts->serializer = best_serializer;
-
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       ts->zmq_cert = sub_cert;
-       ts->zmq_pub_cert = pub_cert;
-#endif
-
-       celixThreadMutex_create(&ts->socket_lock, NULL);
-       celixThreadMutex_create(&ts->ts_lock,NULL);
-       arrayList_create(&ts->sub_ep_list);
-       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
-
-       arrayList_create(&ts->pendingConnections);
-       arrayList_create(&ts->pendingDisconnections);
-       celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
-       celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
-
-       char filter[128];
-       memset(filter,0,128);
-       
if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT,scope,strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT))
 == 0) {
-               // default scope, means that subscriber has not defined a scope 
property
-               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
-                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                               PUBSUB_SUBSCRIBER_TOPIC,topic);
-
-       } else {
-               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
-                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
-                               PUBSUB_SUBSCRIBER_TOPIC,topic,
-                               PUBSUB_SUBSCRIBER_SCOPE,scope);
-       }
-       service_tracker_customizer_pt customizer = NULL;
-       status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
-       status += serviceTracker_createWithFilter(bundle_context, filter, 
customizer, &ts->tracker);
-
-       struct sigaction actions;
-       memset(&actions, 0, sizeof(actions));
-       sigemptyset(&actions.sa_mask);
-       actions.sa_flags = 0;
-       actions.sa_handler = sigusr1_sighandler;
-
-       sigaction(SIGUSR1,&actions,NULL);
-
-       *out=ts;
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-
-       ts->running = false;
-       serviceTracker_destroy(ts->tracker);
-       arrayList_clear(ts->sub_ep_list);
-       arrayList_destroy(ts->sub_ep_list);
-       /* TODO: Destroy all the serializer maps? */
-       hashMap_destroy(ts->servicesMap,false,false);
-
-       celixThreadMutex_lock(&ts->pendingConnections_lock);
-       arrayList_destroy(ts->pendingConnections);
-       celixThreadMutex_unlock(&ts->pendingConnections_lock);
-       celixThreadMutex_destroy(&ts->pendingConnections_lock);
-
-       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-       arrayList_destroy(ts->pendingDisconnections);
-       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-       celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
-
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       celixThreadMutex_lock(&ts->socket_lock);
-       zsock_destroy(&(ts->zmq_socket));
-#ifdef BUILD_WITH_ZMQ_SECURITY
-       zcert_destroy(&(ts->zmq_cert));
-       zcert_destroy(&(ts->zmq_pub_cert));
-#endif
-       celixThreadMutex_unlock(&ts->socket_lock);
-       celixThreadMutex_destroy(&ts->socket_lock);
-
-       celixThreadMutex_destroy(&ts->ts_lock);
-
-       free(ts);
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
-       celix_status_t status = CELIX_SUCCESS;
-
-       status = serviceTracker_open(ts->tracker);
-
-       ts->running = true;
-
-       if(status==CELIX_SUCCESS){
-               
status=celixThread_create(&ts->recv_thread,NULL,zmq_recv_thread_func,ts);
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
-       celix_status_t status = CELIX_SUCCESS;
-
-       ts->running = false;
-
-       pthread_kill(ts->recv_thread.thread,SIGUSR1);
-
-       celixThread_join(ts->recv_thread,NULL);
-
-       status = serviceTracker_close(ts->tracker);
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL){
-       celix_status_t status = CELIX_SUCCESS;
-       celixThreadMutex_lock(&ts->socket_lock);
-       if(!zsock_is(ts->zmq_socket) || 
zsock_connect(ts->zmq_socket,"%s",pubURL) != 0){
-               status = CELIX_SERVICE_EXCEPTION;
-       }
-       celixThreadMutex_unlock(&ts->socket_lock);
-
-       return status;
-}
-
-celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
-       celix_status_t status = CELIX_SUCCESS;
-       char *url = strdup(pubURL);
-       celixThreadMutex_lock(&ts->pendingConnections_lock);
-       arrayList_add(ts->pendingConnections, url);
-       celixThreadMutex_unlock(&ts->pendingConnections_lock);
-       return status;
-}
-
-celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
-       celix_status_t status = CELIX_SUCCESS;
-       char *url = strdup(pubURL);
-       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
-       arrayList_add(ts->pendingDisconnections, url);
-       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
-       return status;
-}
-
-celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->socket_lock);
-       if(!zsock_is(ts->zmq_socket) || 
zsock_disconnect(ts->zmq_socket,"%s",pubURL) != 0){
-               status = CELIX_SERVICE_EXCEPTION;
-       }
-       celixThreadMutex_unlock(&ts->socket_lock);
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       arrayList_add(ts->sub_ep_list,subEP);
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-
-}
-
-celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       ts->nrSubscribers++;
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
-
-celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       arrayList_removeElement(ts->sub_ep_list,subEP);
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
-
-celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       ts->nrSubscribers--;
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
-
-unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
-       return ts->nrSubscribers;
-}
-
-array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
-       return sub->sub_ep_list;
-}
-
-static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
-       celix_status_t status = CELIX_SUCCESS;
-       topic_subscription_pt ts = handle;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       if (!hashMap_containsKey(ts->servicesMap, service)) {
-               bundle_pt bundle = NULL;
-               hash_map_pt msgTypes = NULL;
-
-               serviceReference_getBundle(reference, &bundle);
-
-               if(ts->serializer != NULL && bundle!=NULL){
-                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
-                       if(msgTypes != NULL){
-                               hashMap_put(ts->servicesMap, service, msgTypes);
-                               printf("PSA_ZMQ_TS: New subscriber 
registered.\n");
-                       }
-               }
-               else{
-                       printf("PSA_ZMQ_TS: Cannot register new subscriber.\n");
-                       status = CELIX_SERVICE_EXCEPTION;
-               }
-       }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
-
-static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
-       celix_status_t status = CELIX_SUCCESS;
-       topic_subscription_pt ts = handle;
-
-       celixThreadMutex_lock(&ts->ts_lock);
-       if (hashMap_containsKey(ts->servicesMap, service)) {
-               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
-               if(msgTypes!=NULL && ts->serializer!=NULL){
-                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
-                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
-               }
-               else{
-                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
-                       status = CELIX_SERVICE_EXCEPTION;
-               }
-       }
-       celixThreadMutex_unlock(&ts->ts_lock);
-
-       return status;
-}
-
-
-static void process_msg(topic_subscription_pt sub,array_list_pt msg_list){
-
-       pubsub_msg_header_pt first_msg_hdr = 
(pubsub_msg_header_pt)zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->header);
-
-       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
-       while (hashMapIterator_hasNext(iter)) {
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
-               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
-
-               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )first_msg_hdr->type);
-               if (msgSer == NULL) {
-                       printf("PSA_ZMQ_TS: Primary message %d not supported. 
NOT sending any part of the whole message.\n",first_msg_hdr->type);
-               }
-               else{
-                       void *msgInst = NULL;
-                       bool validVersion = 
checkVersion(msgSer->msgVersion,first_msg_hdr);
-
-                       if(validVersion){
-
-                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) 
zframe_data(((complete_zmq_msg_pt)arrayList_get(msg_list,0))->payload), 0, 
&msgInst);
-
-                               if (status == CELIX_SUCCESS) {
-                                       bool release = true;
-                                       mp_handle_pt mp_handle = 
create_mp_handle(msgTypes,msg_list);
-                                       pubsub_multipart_callbacks_t 
mp_callbacks;
-                                       mp_callbacks.handle = mp_handle;
-                                       mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
-                                       mp_callbacks.getMultipart = 
pubsub_getMultipart;
-                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, first_msg_hdr->type, msgInst, &mp_callbacks, &release);
-
-                                       if(release){
-                                               
msgSer->freeMsg(msgSer,msgInst); // pubsubSerializer_freeMsg(msgType, msgInst);
-                                       }
-                                       if(mp_handle!=NULL){
-                                               destroy_mp_handle(mp_handle);
-                                       }
-                               }
-                               else{
-                                       printf("PSA_ZMQ_TS: Cannot deserialize 
msgType %s.\n",msgSer->msgName);
-                               }
-
-                       }
-                       else{
-                               int major=0,minor=0;
-                               version_getMajor(msgSer->msgVersion,&major);
-                               version_getMinor(msgSer->msgVersion,&minor);
-                               printf("PSA_ZMQ_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
-                                               
msgSer->msgName,major,minor,first_msg_hdr->major,first_msg_hdr->minor);
-                       }
-
-               }
-       }
-       hashMapIterator_destroy(iter);
-
-       int i = 0;
-       for(;i<arrayList_size(msg_list);i++){
-               complete_zmq_msg_pt c_msg = arrayList_get(msg_list,i);
-               zframe_destroy(&(c_msg->header));
-               zframe_destroy(&(c_msg->payload));
-               free(c_msg);
-       }
-
-       arrayList_destroy(msg_list);
-
-}
-
-static void* zmq_recv_thread_func(void * arg) {
-       topic_subscription_pt sub = (topic_subscription_pt) arg;
-
-       while (sub->running) {
-
-               celixThreadMutex_lock(&sub->socket_lock);
-
-               zframe_t* headerMsg = zframe_recv(sub->zmq_socket);
-               if (headerMsg == NULL) {
-                       if (errno == EINTR) {
-                               //It means we got a signal and we have to 
exit...
-                               printf("PSA_ZMQ_TS: header_recv thread for 
topic got a signal and will exit.\n");
-                       } else {
-                               perror("PSA_ZMQ_TS: header_recv thread");
-                       }
-               }
-               else {
-
-                       pubsub_msg_header_pt hdr = (pubsub_msg_header_pt) 
zframe_data(headerMsg);
-
-                       if (zframe_more(headerMsg)) {
-
-                               zframe_t* payloadMsg = 
zframe_recv(sub->zmq_socket);
-                               if (payloadMsg == NULL) {
-                                       if (errno == EINTR) {
-                                               //It means we got a signal and 
we have to exit...
-                                               printf("PSA_ZMQ_TS: 
payload_recv thread for topic got a signal and will exit.\n");
-                                       } else {
-                                               perror("PSA_ZMQ_TS: 
payload_recv");
-                                       }
-                                       zframe_destroy(&headerMsg);
-                               } else {
-
-                                       //Let's fetch all the messages from the 
socket
-                                       array_list_pt msg_list = NULL;
-                                       arrayList_create(&msg_list);
-                                       complete_zmq_msg_pt firstMsg = 
calloc(1, sizeof(struct complete_zmq_msg));
-                                       firstMsg->header = headerMsg;
-                                       firstMsg->payload = payloadMsg;
-                                       arrayList_add(msg_list, firstMsg);
-
-                                       bool more = zframe_more(payloadMsg);
-                                       while (more) {
-
-                                               zframe_t* h_msg = 
zframe_recv(sub->zmq_socket);
-                                               if (h_msg == NULL) {
-                                                       if (errno == EINTR) {
-                                                               //It means we 
got a signal and we have to exit...
-                                                               
printf("PSA_ZMQ_TS: h_recv thread for topic got a signal and will exit.\n");
-                                                       } else {
-                                                               
perror("PSA_ZMQ_TS: h_recv");
-                                                       }
-                                                       break;
-                                               }
-
-                                               zframe_t* p_msg = 
zframe_recv(sub->zmq_socket);
-                                               if (p_msg == NULL) {
-                                                       if (errno == EINTR) {
-                                                               //It means we 
got a signal and we have to exit...
-                                                               
printf("PSA_ZMQ_TS: p_recv thread for topic got a signal and will exit.\n");
-                                                       } else {
-                                                               
perror("PSA_ZMQ_TS: p_recv");
-                                                       }
-                                                       zframe_destroy(&h_msg);
-                                                       break;
-                                               }
-
-                                               complete_zmq_msg_pt c_msg = 
calloc(1, sizeof(struct complete_zmq_msg));
-                                               c_msg->header = h_msg;
-                                               c_msg->payload = p_msg;
-                                               arrayList_add(msg_list, c_msg);
-
-                                               if (!zframe_more(p_msg)) {
-                                                       more = false;
-                                               }
-                                       }
-
-                                       celixThreadMutex_lock(&sub->ts_lock);
-                                       process_msg(sub, msg_list);
-                                       celixThreadMutex_unlock(&sub->ts_lock);
-
-                               }
-
-                       } //zframe_more(headerMsg)
-                       else {
-                               free(headerMsg);
-                               printf("PSA_ZMQ_TS: received message %u for 
topic %s without payload!\n", hdr->type, hdr->topic);
-                       }
-
-               } // headerMsg != NULL
-               celixThreadMutex_unlock(&sub->socket_lock);
-               connectPendingPublishers(sub);
-               disconnectPendingPublishers(sub);
-       } // while
-
-       return NULL;
-}
-
-static void connectPendingPublishers(topic_subscription_pt sub) {
-       celixThreadMutex_lock(&sub->pendingConnections_lock);
-       while(!arrayList_isEmpty(sub->pendingConnections)) {
-               char * pubEP = arrayList_remove(sub->pendingConnections, 0);
-               pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
-               free(pubEP);
-       }
-       celixThreadMutex_unlock(&sub->pendingConnections_lock);
-}
-
-static void disconnectPendingPublishers(topic_subscription_pt sub) {
-       celixThreadMutex_lock(&sub->pendingDisconnections_lock);
-       while(!arrayList_isEmpty(sub->pendingDisconnections)) {
-               char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
-               pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
-               free(pubEP);
-       }
-       celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
-}
-
-static void sigusr1_sighandler(int signo){
-       printf("PSA_ZMQ_TS: Topic subscription being shut down...\n");
-       return;
-}
-
-static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
-       bool check=false;
-       int major=0,minor=0;
-
-       if(msgVersion!=NULL){
-               version_getMajor(msgVersion,&major);
-               version_getMinor(msgVersion,&minor);
-               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
-                       check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
-               }
-       }
-
-       return check;
-}
-
-static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
-}
-
-static int pubsub_getMultipart(void *handle, unsigned int msgTypeId, bool 
retain, void **part){
-
-       if(handle==NULL){
-               *part = NULL;
-               return -1;
-       }
-
-       mp_handle_pt mp_handle = (mp_handle_pt)handle;
-       msg_map_entry_pt entry = hashMap_get(mp_handle->rcv_msg_map, 
(void*)(uintptr_t) msgTypeId);
-       if(entry!=NULL){
-               entry->retain = retain;
-               *part = entry->msgInst;
-       }
-       else{
-               printf("TP: getMultipart cannot find msg '%u'\n",msgTypeId);
-               *part=NULL;
-               return -2;
-       }
-
-       return 0;
-
-}
-
-static mp_handle_pt create_mp_handle(hash_map_pt svc_msg_db,array_list_pt 
rcv_msg_list){
-
-       if(arrayList_size(rcv_msg_list)==1){ //Means it's not a multipart 
message
-               return NULL;
-       }
-
-       mp_handle_pt mp_handle = calloc(1,sizeof(struct mp_handle));
-       mp_handle->svc_msg_db = svc_msg_db;
-       mp_handle->rcv_msg_map = hashMap_create(NULL, NULL, NULL, NULL);
-
-       int i=1; //We skip the first message, it will be handle differently
-       for(;i<arrayList_size(rcv_msg_list);i++){
-               complete_zmq_msg_pt c_msg = 
(complete_zmq_msg_pt)arrayList_get(rcv_msg_list,i);
-               pubsub_msg_header_pt header = 
(pubsub_msg_header_pt)zframe_data(c_msg->header);
-
-               pubsub_msg_serializer_t* msgSer = hashMap_get(svc_msg_db, 
(void*)(uintptr_t)(header->type));
-
-               if (msgSer!= NULL) {
-                       void *msgInst = NULL;
-
-                       bool validVersion = 
checkVersion(msgSer->msgVersion,header);
-
-                       if(validVersion){
-                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void*)zframe_data(c_msg->payload), 0, 
&msgInst);
-
-                               if(status == CELIX_SUCCESS){
-                                       msg_map_entry_pt entry = 
calloc(1,sizeof(struct msg_map_entry));
-                                       entry->msgInst = msgInst;
-                                       hashMap_put(mp_handle->rcv_msg_map, 
(void*)(uintptr_t)header->type,entry);
-                               }
-                       }
-               }
-       }
-
-       return mp_handle;
-
-}
-
-static void destroy_mp_handle(mp_handle_pt mp_handle){
-
-       hash_map_iterator_pt iter = 
hashMapIterator_create(mp_handle->rcv_msg_map);
-       while(hashMapIterator_hasNext(iter)){
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               unsigned int msgId = (unsigned 
int)(uintptr_t)hashMapEntry_getKey(entry);
-               msg_map_entry_pt msgEntry = hashMapEntry_getValue(entry);
-               pubsub_msg_serializer_t* msgSer = 
hashMap_get(mp_handle->svc_msg_db, (void*)(uintptr_t)msgId);
-
-               if(msgSer!=NULL){
-                       if(!msgEntry->retain){
-                               
msgSer->freeMsg(msgSer->handle,msgEntry->msgInst);
-                       }
-               }
-               else{
-                       printf("PSA_ZMQ_TS: ERROR: Cannot find 
messageSerializer for msg %u, so cannot destroy it!\n",msgId);
-               }
-
-               free(msgEntry);
-       }
-       hashMapIterator_destroy(iter);
-
-       hashMap_destroy(mp_handle->rcv_msg_map,false,false);
-       free(mp_handle);
-}

Reply via email to