http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/psa_activator.c 
b/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
new file mode 100644
index 0000000..cd4ee07
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/psa_activator.c
@@ -0,0 +1,141 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * psa_activator.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+
+#include "bundle_activator.h"
+#include "service_registration.h"
+#include "service_tracker.h"
+
+#include "pubsub_admin_impl.h"
+
+struct activator {
+       pubsub_admin_pt admin;
+       pubsub_admin_service_pt adminService;
+       service_registration_pt registration;
+       service_tracker_pt serializerTracker;
+};
+
+celix_status_t bundleActivator_create(bundle_context_pt context, void 
**userData) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator;
+
+       activator = calloc(1, sizeof(*activator));
+       if (!activator) {
+               status = CELIX_ENOMEM;
+       }
+       else{
+               *userData = activator;
+
+               status = pubsubAdmin_create(context, &(activator->admin));
+
+               if(status == CELIX_SUCCESS){
+                       service_tracker_customizer_pt customizer = NULL;
+                       status = 
serviceTrackerCustomizer_create(activator->admin,
+                                       NULL,
+                                       pubsubAdmin_serializerAdded,
+                                       NULL,
+                                       pubsubAdmin_serializerRemoved,
+                                       &customizer);
+                       if(status == CELIX_SUCCESS){
+                               status = serviceTracker_create(context, 
PUBSUB_SERIALIZER_SERVICE, customizer, &(activator->serializerTracker));
+                               if(status != CELIX_SUCCESS){
+                                       
serviceTrackerCustomizer_destroy(customizer);
+                                       pubsubAdmin_destroy(activator->admin);
+                               }
+                       }
+                       else{
+                               pubsubAdmin_destroy(activator->admin);
+                       }
+               }
+       }
+
+       return status;
+}
+
+celix_status_t bundleActivator_start(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+       pubsub_admin_service_pt pubsubAdminSvc = calloc(1, 
sizeof(*pubsubAdminSvc));
+
+       if (!pubsubAdminSvc) {
+               status = CELIX_ENOMEM;
+       }
+       else{
+               pubsubAdminSvc->admin = activator->admin;
+
+               pubsubAdminSvc->addPublication = pubsubAdmin_addPublication;
+               pubsubAdminSvc->removePublication = 
pubsubAdmin_removePublication;
+
+               pubsubAdminSvc->addSubscription = pubsubAdmin_addSubscription;
+               pubsubAdminSvc->removeSubscription = 
pubsubAdmin_removeSubscription;
+
+               pubsubAdminSvc->closeAllPublications = 
pubsubAdmin_closeAllPublications;
+               pubsubAdminSvc->closeAllSubscriptions = 
pubsubAdmin_closeAllSubscriptions;
+
+               pubsubAdminSvc->matchEndpoint = pubsubAdmin_matchEndpoint;
+
+               activator->adminService = pubsubAdminSvc;
+
+               status = bundleContext_registerService(context, 
PUBSUB_ADMIN_SERVICE, pubsubAdminSvc, NULL, &activator->registration);
+
+               status += serviceTracker_open(activator->serializerTracker);
+
+       }
+
+
+       return status;
+}
+
+celix_status_t bundleActivator_stop(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       status += serviceTracker_close(activator->serializerTracker);
+       status += serviceRegistration_unregister(activator->registration);
+
+       activator->registration = NULL;
+
+       free(activator->adminService);
+       activator->adminService = NULL;
+
+       return status;
+}
+
+celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt 
context) {
+       celix_status_t status = CELIX_SUCCESS;
+       struct activator *activator = userData;
+
+       serviceTracker_destroy(activator->serializerTracker);
+       pubsubAdmin_destroy(activator->admin);
+       activator->admin = NULL;
+
+       free(activator);
+
+       return status;
+}
+
+

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c 
b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
new file mode 100644
index 0000000..1fbdb08
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
@@ -0,0 +1,1039 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.c
+ *
+ *  \date       Sep 30, 2011
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#ifndef ANDROID
+#include <ifaddrs.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include "constants.h"
+#include "utils.h"
+#include "hash_map.h"
+#include "array_list.h"
+#include "bundle_context.h"
+#include "bundle.h"
+#include "service_reference.h"
+#include "service_registration.h"
+#include "log_helper.h"
+#include "log_service.h"
+#include "celix_threads.h"
+#include "service_factory.h"
+
+#include "pubsub_admin_impl.h"
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "pubsub_endpoint.h"
+#include "subscriber.h"
+#include "pubsub_admin_match.h"
+
+static const char *DEFAULT_MC_IP = "224.100.1.1";
+static char *DEFAULT_MC_PREFIX = "224.100";
+
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip);
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       *admin = calloc(1, sizeof(**admin));
+
+       if (!*admin) {
+               return CELIX_ENOMEM;
+       }
+
+       char *mc_ip = NULL;
+       char *if_ip = NULL;
+       int sendSocket = -1;
+
+       if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
+               logHelper_start((*admin)->loghelper);
+       }
+       const char *mc_ip_prop = NULL;
+       bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
+       if(mc_ip_prop) {
+               mc_ip = strdup(mc_ip_prop);
+       }
+
+#ifndef ANDROID
+       if (mc_ip == NULL) {
+               const char *mc_prefix = NULL;
+               const char *interface = NULL;
+               int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
+               bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , 
&mc_prefix);
+               if(mc_prefix == NULL) {
+                       mc_prefix = DEFAULT_MC_PREFIX;
+               }
+
+               bundleContext_getProperty(context, PSA_ITF, &interface);
+               if (pubsubAdmin_getIpAddress(interface, &if_ip) != 
CELIX_SUCCESS) {
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for 
interface %s", interface);
+               }
+
+               printf("IP Detected : %s\n", if_ip);
+               if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 
4) {
+                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
+                       b2 = 1;
+                       b3 = 1;
+               }
+
+               asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
+
+               sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
+               if(sendSocket == -1) {
+                       perror("pubsubAdmin_create:socket");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+
+               if(status == CELIX_SUCCESS){
+                       char loop = 1;
+                       if(setsockopt(sendSocket, IPPROTO_IP, 
IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
+                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+               }
+
+               if(status == CELIX_SUCCESS){
+                       struct in_addr multicast_interface;
+                       inet_aton(if_ip, &multicast_interface);
+                       if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, 
&multicast_interface, sizeof(multicast_interface)) != 0) {
+                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+               }
+
+       }
+
+
+       if(status != CELIX_SUCCESS){
+               logHelper_stop((*admin)->loghelper);
+               logHelper_destroy(&((*admin)->loghelper));
+               if(sendSocket >=0){
+                       close(sendSocket);
+               }
+               if(if_ip != NULL){
+                       free(if_ip);
+               }
+               if(mc_ip != NULL){
+                       free(mc_ip);
+               }
+               return status;
+       }
+       else{
+               (*admin)->sendSocket = sendSocket;
+       }
+
+#endif
+
+       (*admin)->bundle_context= context;
+       (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, 
NULL, NULL);
+       (*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, 
NULL, NULL);
+       arrayList_create(&((*admin)->noSerializerSubscriptions));
+       arrayList_create(&((*admin)->noSerializerPublications));
+       arrayList_create(&((*admin)->serializerList));
+
+       celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
+       celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
+       celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
+       celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
+       celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
+
+       celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
+       celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
+       celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
&(*admin)->noSerializerPendingsAttr);
+
+       celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
+       celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
+       celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
+
+       if (if_ip != NULL) {
+               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, 
"PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
+               (*admin)->ifIpAddress = if_ip;
+       } else {
+               (*admin)->ifIpAddress = strdup("127.0.0.1");
+       }
+
+       if (mc_ip != NULL) {
+               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, 
"PSA_UDP_MC: Using %s for service annunciation", mc_ip);
+               (*admin)->mcIpAddress = mc_ip;
+       }
+       else {
+               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, 
"PSA_UDP_MC: No IP address for service annunciation set. Using %s", 
DEFAULT_MC_IP);
+               (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
+       }
+
+       return status;
+}
+
+
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
+{
+       celix_status_t status = CELIX_SUCCESS;
+
+       free(admin->mcIpAddress);
+       free(admin->ifIpAddress);
+
+       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->pendingSubscriptions);
+       while(hashMapIterator_hasNext(iter)){
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               free((char*)hashMapEntry_getKey(entry));
+               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->pendingSubscriptions,false,false);
+       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+       hashMap_destroy(admin->subscriptions,false,false);
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       celixThreadMutex_lock(&admin->localPublicationsLock);
+       hashMap_destroy(admin->localPublications,true,false);
+       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+       celixThreadMutex_lock(&admin->externalPublicationsLock);
+       iter = hashMapIterator_create(admin->externalPublications);
+       while(hashMapIterator_hasNext(iter)){
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               free((char*)hashMapEntry_getKey(entry));
+               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->externalPublications,false,false);
+       celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_destroy(admin->serializerList);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+       arrayList_destroy(admin->noSerializerSubscriptions);
+       arrayList_destroy(admin->noSerializerPublications);
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
+
+       iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
+       while(hashMapIterator_hasNext(iter)){
+               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+       celixThreadMutex_destroy(&admin->usedSerializersLock);
+       celixThreadMutex_destroy(&admin->serializerListLock);
+
+       celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
+       celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
+
+       celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
+       celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
+
+       celixThreadMutex_destroy(&admin->subscriptionsLock);
+       celixThreadMutex_destroy(&admin->localPublicationsLock);
+       celixThreadMutex_destroy(&admin->externalPublicationsLock);
+
+       logHelper_stop(admin->loghelper);
+
+       logHelper_destroy(&admin->loghelper);
+
+       free(admin);
+
+       return status;
+}
+
+static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+
+       topic_subscription_pt any_sub = 
hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+
+       if(any_sub==NULL){
+
+               int i;
+               pubsub_serializer_service_t *best_serializer = NULL;
+               if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                       status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
+               }
+               else{
+                       printf("PSA_UDP_MC: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       arrayList_add(admin->noSerializerSubscriptions,subEP);
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+               }
+
+               if (status == CELIX_SUCCESS){
+
+                       /* Connect all internal publishers */
+                       celixThreadMutex_lock(&admin->localPublicationsLock);
+                       hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
+                       while(hashMapIterator_hasNext(lp_iter)){
+                               service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
+                               topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                               array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                               if(topic_publishers!=NULL){
+                                       
for(i=0;i<arrayList_size(topic_publishers);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               }
+                                       }
+                                       arrayList_destroy(topic_publishers);
+                               }
+                       }
+                       hashMapIterator_destroy(lp_iter);
+                       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+                       /* Connect also all external publishers */
+                       celixThreadMutex_lock(&admin->externalPublicationsLock);
+                       hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
+                       while(hashMapIterator_hasNext(extp_iter)){
+                               array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
+                               if(ext_pub_list!=NULL){
+                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
+                                               }
+                                       }
+                               }
+                       }
+                       hashMapIterator_destroy(extp_iter);
+                       
celixThreadMutex_unlock(&admin->externalPublicationsLock);
+
+
+                       pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
+
+                       status += pubsub_topicSubscriptionStart(any_sub);
+
+               }
+
+               if (status == CELIX_SUCCESS){
+                       
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
+                       connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
+               }
+
+       }
+
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       return status;
+}
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_UDP_MC: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope,subEP->topic);
+
+       if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
+               return pubsubAdmin_addAnySubscription(admin,subEP);
+       }
+
+       /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
+       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+       celixThreadMutex_lock(&admin->localPublicationsLock);
+       celixThreadMutex_lock(&admin->externalPublicationsLock);
+
+       char* scope_topic = createScopeTopicKey(subEP->scope,subEP->topic);
+
+       service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+       array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+
+       if(factory==NULL && ext_pub_list==NULL){ //No (local or external) 
publishers yet for this topic
+               pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
+       }
+       else{
+               int i;
+               topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
+
+               if(subscription == NULL) {
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, 
subEP->scope, subEP->topic, best_serializer, &subscription);
+                       }
+                       else{
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for subscribing topic %s. Adding it to pending list.\n",subEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       if (status==CELIX_SUCCESS){
+
+                               /* Try to connect internal publishers */
+                               if(factory!=NULL){
+                                       topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
+                                       array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
+
+                                       if(topic_publishers!=NULL){
+                                               
for(i=0;i<arrayList_size(topic_publishers);i++){
+                                                       pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
+                                                       if(pubEP->endpoint 
!=NULL){
+                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                                       }
+                                               }
+                                               
arrayList_destroy(topic_publishers);
+                                       }
+
+                               }
+
+                               /* Look also for external publishers */
+                               if(ext_pub_list!=NULL){
+                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
+                                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                                               if(pubEP->endpoint !=NULL){
+                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
+                                               }
+                                       }
+                               }
+
+                               
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
+
+                               status += 
pubsub_topicSubscriptionStart(subscription);
+
+                       }
+
+                       if(status==CELIX_SUCCESS){
+
+                               
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
+
+                               connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
+                       }
+               }
+
+               if (status == CELIX_SUCCESS){
+                       pubsub_topicIncreaseNrSubscribers(subscription);
+               }
+       }
+
+       free(scope_topic);
+       celixThreadMutex_unlock(&admin->externalPublicationsLock);
+       celixThreadMutex_unlock(&admin->localPublicationsLock);
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+       return status;
+
+}
+
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_UDP_MC: Removing subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
+
+       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
+
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+       if(sub!=NULL){
+               pubsub_topicDecreaseNrSubscribers(sub);
+               if(pubsub_topicGetNrSubscribers(sub) == 0) {
+                       status = 
pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
+               }
+       }
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       if(sub==NULL){
+               /* Maybe the endpoint was pending */
+               celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+               if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
+                       status = CELIX_ILLEGAL_STATE;
+               }
+               celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+       }
+
+       free(scope_topic);
+
+
+
+       return status;
+
+}
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_UDP_MC: Received publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
+
+       const char* fwUUID = NULL;
+
+       
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+       if(fwUUID==NULL){
+               printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+               return CELIX_INVALID_BUNDLE_CONTEXT;
+       }
+       char* scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
+
+               celixThreadMutex_lock(&admin->localPublicationsLock);
+
+               service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
+
+               if (factory == NULL) {
+                       topic_publication_pt pub = NULL;
+                       pubsub_serializer_service_t *best_serializer = NULL;
+                       if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer)) == CELIX_SUCCESS){
+                               status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, 
admin->mcIpAddress, &pub);
+                       }
+                       else{
+                               printf("PSA_UDP_MC: Cannot find a serializer 
for publishing topic %s. Adding it to pending list.\n", pubEP->topic);
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       if (status == CELIX_SUCCESS) {
+                               status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
+                               if (status == CELIX_SUCCESS && factory != NULL) 
{
+                                       hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
+                                       connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
+                               }
+                       } else {
+                               printf("PSA_UDP_MC: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
+                       }
+               } else {
+                       //just add the new EP to the list
+                       topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
+                       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
+               }
+
+               celixThreadMutex_unlock(&admin->localPublicationsLock);
+       }
+       else{
+
+               celixThreadMutex_lock(&admin->externalPublicationsLock);
+               array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
+               if (ext_pub_list == NULL) {
+                       arrayList_create(&ext_pub_list);
+                       hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
+               }
+
+               arrayList_add(ext_pub_list, pubEP);
+
+               celixThreadMutex_unlock(&admin->externalPublicationsLock);
+       }
+
+       /* Re-evaluate the pending subscriptions */
+       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
+
+       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
+       if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
+               char* topic = (char*) hashMapEntry_getKey(pendingSub);
+               array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
+               int i;
+               for (i = 0; i < arrayList_size(pendingSubList); i++) {
+                       pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
+                       pubsubAdmin_addSubscription(admin, subEP);
+               }
+               hashMap_remove(admin->pendingSubscriptions, scope_topic);
+               arrayList_clear(pendingSubList);
+               arrayList_destroy(pendingSubList);
+               free(topic);
+       }
+
+       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
+
+       /* Connect the new publisher to the subscription for his topic, if 
there is any */
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+
+       topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
+       if (sub != NULL && pubEP->endpoint != NULL) {
+               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
+       }
+
+       /* And check also for ANY subscription */
+       topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
+       if (any_sub != NULL && pubEP->endpoint != NULL) {
+               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
+       }
+
+       free(scope_topic);
+
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       return status;
+
+}
+
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
+       celix_status_t status = CELIX_SUCCESS;
+       int count = 0;
+
+       printf("PSA_UDP_MC: Removing publication [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->scope, 
pubEP->topic);
+
+       const char* fwUUID = NULL;
+
+       
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
+       if(fwUUID==NULL){
+               printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
+               return CELIX_INVALID_BUNDLE_CONTEXT;
+       }
+       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
+
+       if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
+
+               celixThreadMutex_lock(&admin->localPublicationsLock);
+               service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
+               if(factory!=NULL){
+                       topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+                       pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
+               }
+               celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+               if(factory==NULL){
+                       /* Maybe the endpoint was pending */
+                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                       
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
+                               status = CELIX_ILLEGAL_STATE;
+                       }
+                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+               }
+
+       }
+       else{
+
+               celixThreadMutex_lock(&admin->externalPublicationsLock);
+               array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
+               if(ext_pub_list!=NULL){
+                       int i;
+                       bool found = false;
+                       for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
+                               pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                               found = pubsubEndpoint_equals(pubEP,p);
+                               if (found){
+                                       arrayList_remove(ext_pub_list,i);
+                               }
+                       }
+                       // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
+                       for(i=0; i<arrayList_size(ext_pub_list);i++) {
+                               pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
+                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
+                                       count++;
+                               }
+                       }
+
+                       if(arrayList_size(ext_pub_list)==0){
+                               hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
+                               char* topic = (char*)hashMapEntry_getKey(entry);
+                               array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
+                               
hashMap_remove(admin->externalPublications,topic);
+                               arrayList_destroy(list);
+                               free(topic);
+                       }
+               }
+
+               celixThreadMutex_unlock(&admin->externalPublicationsLock);
+       }
+
+       /* Check if this publisher was connected to one of our subscribers*/
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+
+       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
+       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
+       }
+
+       /* And check also for ANY subscription */
+       topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
+       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
+               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
+       }
+
+       free(scope_topic);
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char 
*scope, char* topic){
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", 
scope, topic);
+
+       celixThreadMutex_lock(&admin->localPublicationsLock);
+       char* scope_topic =createScopeTopicKey(scope, topic);
+       hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
+       if(pubsvc_entry!=NULL){
+               char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
+               service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
+               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+               status += pubsub_topicPublicationStop(pub);
+               disconnectTopicPubSubFromSerializer(admin, pub, true);
+               status += pubsub_topicPublicationDestroy(pub);
+               hashMap_remove(admin->localPublications,scope_topic);
+               free(key);
+               free(factory);
+       }
+       free(scope_topic);
+       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+       return status;
+
+}
+
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char 
*scope, char* topic){
+       celix_status_t status = CELIX_SUCCESS;
+
+       printf("PSA_UDP_MC: Closing all subscriptions\n");
+
+       celixThreadMutex_lock(&admin->subscriptionsLock);
+       char* scope_topic =createScopeTopicKey(scope, topic);
+       hash_map_entry_pt sub_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
+       if(sub_entry!=NULL){
+               char* topic = (char*)hashMapEntry_getKey(sub_entry);
+
+               topic_subscription_pt ts = 
(topic_subscription_pt)hashMapEntry_getValue(sub_entry);
+               status += pubsub_topicSubscriptionStop(ts);
+               disconnectTopicPubSubFromSerializer(admin, ts, false);
+               status += pubsub_topicSubscriptionDestroy(ts);
+               hashMap_remove(admin->subscriptions,topic);
+               free(topic);
+
+       }
+       free(scope_topic);
+       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+       return status;
+
+}
+
+
+#ifndef ANDROID
+static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip) {
+       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
+
+       struct ifaddrs *ifaddr, *ifa;
+       char host[NI_MAXHOST];
+
+       if (getifaddrs(&ifaddr) != -1)
+       {
+               for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa 
= ifa->ifa_next)
+               {
+                       if (ifa->ifa_addr == NULL)
+                               continue;
+
+                       if ((getnameinfo(ifa->ifa_addr,sizeof(struct 
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && 
(ifa->ifa_addr->sa_family == AF_INET)) {
+                               if (interface == NULL) {
+                                       *ip = strdup(host);
+                                       status = CELIX_SUCCESS;
+                               }
+                               else if (strcmp(ifa->ifa_name, interface) == 0) 
{
+                                       *ip = strdup(host);
+                                       status = CELIX_SUCCESS;
+                               }
+                       }
+               }
+
+               freeifaddrs(ifaddr);
+       }
+
+       return status;
+}
+#endif
+
+static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       char* scope_topic =createScopeTopicKey(subEP->scope, subEP->topic);
+       array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
+       if(pendingListPerTopic==NULL){
+               arrayList_create(&pendingListPerTopic);
+               
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
+       }
+       arrayList_add(pendingListPerTopic,subEP);
+       free(scope_topic);
+
+       return status;
+}
+
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
+       /* Assumption: serializers are all available at startup.
+        * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
+
+       celix_status_t status = CELIX_SUCCESS;
+       int i=0;
+
+       const char *serType = NULL;
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       celixThreadMutex_lock(&admin->serializerListLock);
+       arrayList_add(admin->serializerList, reference);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       /* Now let's re-evaluate the pending */
+       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+
+       for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addSubscription(admin, ep);
+               }
+       }
+
+       for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
+               pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
+               pubsub_serializer_service_t *best_serializer = NULL;
+               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
+               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
+                       pubsubAdmin_addPublication(admin, ep);
+               }
+       }
+
+       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+
+       printf("PSA_UDP_MC: %s serializer added\n",serType);
+
+       return status;
+}
+
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
+
+       pubsub_admin_pt admin = (pubsub_admin_pt)handle;
+       int i=0, j=0;
+       const char *serType = NULL;
+
+       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
+       if(serType == NULL){
+               printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
+               return CELIX_SERVICE_EXCEPTION;
+       }
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       /* Remove the serializer from the list */
+       arrayList_removeElement(admin->serializerList, reference);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+       array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
+       array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+       /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       if(topicPubList!=NULL){
+               for(i=0;i<arrayList_size(topicPubList);i++){
+                       topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
+                       /* Stop the topic publication */
+                       pubsub_topicPublicationStop(topicPub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
+                       for(j=0;j<arrayList_size(pubList);j++){
+                               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
+                               /* Remove the publication */
+                               pubsubAdmin_removePublication(admin, pubEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(pubEP->endpoint!=NULL){
+                                       free(pubEP->endpoint);
+                                       pubEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerPublications,pubEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+                       arrayList_destroy(pubList);
+
+                       /* Cleanup also the localPublications hashmap*/
+                       celixThreadMutex_lock(&admin->localPublicationsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
+                       char *key = NULL;
+                       service_factory_pt factory = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
+                               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
+                               if(pub==topicPub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->localPublications, key);
+                               free(factory);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->localPublicationsLock);
+
+                       /* Finally destroy the topicPublication */
+                       pubsub_topicPublicationDestroy(topicPub);
+               }
+               arrayList_destroy(topicPubList);
+       }
+
+       /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
+       if(topicSubList!=NULL){
+               for(i=0;i<arrayList_size(topicSubList);i++){
+                       topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
+                       /* Stop the topic subscription */
+                       pubsub_topicSubscriptionStop(topicSub);
+                       /* Get the endpoints that are going to be orphan */
+                       array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
+                       for(j=0;j<arrayList_size(subList);j++){
+                               pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
+                               /* Remove the subscription */
+                               pubsubAdmin_removeSubscription(admin, subEP);
+                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
+                               if(subEP->endpoint!=NULL){
+                                       free(subEP->endpoint);
+                                       subEP->endpoint = NULL;
+                               }
+                               /* Add the orphan endpoint to the noSerializer 
pending list */
+                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
+                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
+                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
+                       }
+
+                       /* Cleanup also the subscriptions hashmap*/
+                       celixThreadMutex_lock(&admin->subscriptionsLock);
+                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
+                       char *key = NULL;
+                       while(hashMapIterator_hasNext(iter)){
+                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
+                               topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
+                               if(sub==topicSub){
+                                       key = (char*)hashMapEntry_getKey(entry);
+                                       break;
+                               }
+                       }
+                       hashMapIterator_destroy(iter);
+                       if(key!=NULL){
+                               hashMap_remove(admin->subscriptions, key);
+                               free(key);
+                       }
+                       celixThreadMutex_unlock(&admin->subscriptionsLock);
+
+                       /* Finally destroy the topicSubscription */
+                       pubsub_topicSubscriptionDestroy(topicSub);
+               }
+               arrayList_destroy(topicSubList);
+       }
+
+       printf("PSA_UDP_MC: %s serializer removed\n",serType);
+
+
+       return CELIX_SUCCESS;
+}
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = 
pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+}
+
+/* This one recall the same logic as in the match function */
+static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&admin->serializerListLock);
+       status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, serSvc);
+       celixThreadMutex_unlock(&admin->serializerListLock);
+
+       return status;
+
+}
+
+static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
+       if(list==NULL){
+               arrayList_create(&list);
+               hashMap_put(map,serializer,list);
+       }
+       arrayList_add(list,topicPubSub);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}
+
+static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
+
+       celixThreadMutex_lock(&admin->usedSerializersLock);
+
+       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
+       hash_map_iterator_pt iter = hashMapIterator_create(map);
+       while(hashMapIterator_hasNext(iter)){
+               array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
+               if(arrayList_removeElement(list, topicPubSub)){ //Found it!
+                       break;
+               }
+       }
+       hashMapIterator_destroy(iter);
+
+       celixThreadMutex_unlock(&admin->usedSerializersLock);
+
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h 
b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
new file mode 100644
index 0000000..de4b813
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
@@ -0,0 +1,93 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * pubsub_admin_impl.h
+ *
+ *  \date       Dec 5, 2013
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
+#define PUBSUB_ADMIN_UDP_MC_IMPL_H_
+
+#include "pubsub_admin.h"
+#include "log_helper.h"
+
+#define PUBSUB_ADMIN_TYPE      "udp_mc"
+
+struct pubsub_admin {
+
+       bundle_context_pt bundle_context;
+       log_helper_pt loghelper;
+
+       /* List of the available serializers */
+       celix_thread_mutex_t serializerListLock; // List<serializers>
+       array_list_pt serializerList;
+
+       celix_thread_mutex_t localPublicationsLock;
+       hash_map_pt localPublications;//<topic(string),service_factory_pt>
+
+       celix_thread_mutex_t externalPublicationsLock;
+       hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
+
+       celix_thread_mutex_t subscriptionsLock;
+       hash_map_pt subscriptions; //<topic(string),topic_subscription>
+
+       celix_thread_mutex_t pendingSubscriptionsLock;
+       celix_thread_mutexattr_t pendingSubscriptionsAttr;
+       hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
+
+       /* Those are used to keep track of valid subscriptions/publications 
that still have no valid serializer */
+       celix_thread_mutex_t noSerializerPendingsLock;
+       celix_thread_mutexattr_t noSerializerPendingsAttr;
+       array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
+       array_list_pt noSerializerPublications; // List<pubsub_ep>
+
+       celix_thread_mutex_t usedSerializersLock;
+       hash_map_pt topicSubscriptionsPerSerializer; // 
<serializer,List<topicSubscription>>
+       hash_map_pt topicPublicationsPerSerializer; // 
<serializer,List<topicPublications>>
+
+       char* ifIpAddress; // The local interface which is used for multicast 
communication
+       char* mcIpAddress; // The multicast IP address
+
+       int sendSocket;
+       void* zmq_context; // to be removed
+
+};
+
+celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin);
+celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin);
+
+celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
+
+celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP);
+celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP);
+
+celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin,char* 
scope, char* topic);
+celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope, char* topic);
+
+celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service);
+celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service);
+
+celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score);
+
+
+#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.c 
b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
new file mode 100644
index 0000000..e43ec29
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
@@ -0,0 +1,444 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  htPSA_UDP_MC_TP://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.c
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_registration.h"
+#include "utils.h"
+#include "service_factory.h"
+#include "version.h"
+
+#include "topic_publication.h"
+#include "pubsub_common.h"
+#include "publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+
+#define EP_ADDRESS_LEN         32
+
+#define FIRST_SEND_DELAY       2
+
+struct topic_publication {
+       int sendSocket;
+       char* endpoint;
+       service_registration_pt svcFactoryReg;
+       array_list_pt pub_ep_list; //List<pubsub_endpoint>
+       hash_map_pt boundServices; //<bundle_pt,bound_service>
+       celix_thread_mutex_t tp_lock;
+       pubsub_serializer_service_t *serializer;
+       struct sockaddr_in destAddr;
+};
+
+typedef struct publish_bundle_bound_service {
+       topic_publication_pt parent;
+       pubsub_publisher_t service;
+       bundle_pt bundle;
+       char *scope;
+       char *topic;
+       hash_map_pt msgTypes;
+       unsigned short getCount;
+       celix_thread_mutex_t mp_lock;
+       largeUdp_pt largeUdpHandle;
+}* publish_bundle_bound_service_pt;
+
+
+typedef struct pubsub_msg{
+       pubsub_msg_header_pt header;
+       char* payload;
+       unsigned int payloadSize;
+} pubsub_msg_t;
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max);
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
+
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
+
+static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+
+
+static void delay_first_send_for_late_joiners(void);
+
+
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, topic_publication_pt *out){
+
+       char* ep = malloc(EP_ADDRESS_LEN);
+       memset(ep,0,EP_ADDRESS_LEN);
+       unsigned int port = pubEP->serviceID + 
rand_range(UDP_BASE_PORT+pubEP->serviceID+3, UDP_MAX_PORT);
+       snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
+
+
+       topic_publication_pt pub = calloc(1,sizeof(*pub));
+
+       arrayList_create(&(pub->pub_ep_list));
+       pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
+       celixThreadMutex_create(&(pub->tp_lock),NULL);
+
+       pub->endpoint = ep;
+       pub->sendSocket = sendSocket;
+       pub->destAddr.sin_family = AF_INET;
+       pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
+       pub->destAddr.sin_port = htons(port);
+
+       pub->serializer = best_serializer;
+
+       pubsub_topicPublicationAddPublisherEP(pub,pubEP);
+
+       *out = pub;
+
+       return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&(pub->tp_lock));
+
+       free(pub->endpoint);
+       arrayList_destroy(pub->pub_ep_list);
+
+       hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
+       while(hashMapIterator_hasNext(iter)){
+               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
+               pubsub_destroyPublishBundleBoundService(bound);
+       }
+       hashMapIterator_destroy(iter);
+       hashMap_destroy(pub->boundServices,false,false);
+
+       pub->svcFactoryReg = NULL;
+       pub->serializer = NULL;
+
+       if(close(pub->sendSocket) != 0){
+               status = CELIX_FILE_IO_EXCEPTION;
+       }
+
+       celixThreadMutex_unlock(&(pub->tp_lock));
+
+       celixThreadMutex_destroy(&(pub->tp_lock));
+
+       free(pub);
+
+       return status;
+}
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
+       celix_status_t status = CELIX_SUCCESS;
+
+       /* Let's register the new service */
+
+       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
+
+       if(pubEP!=NULL){
+               service_factory_pt factory = calloc(1, sizeof(*factory));
+               factory->handle = pub;
+               factory->getService = pubsub_topicPublicationGetService;
+               factory->ungetService = pubsub_topicPublicationUngetService;
+
+               properties_pt props = properties_create();
+               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
+               properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
+
+               status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
+
+               if(status != CELIX_SUCCESS){
+                       properties_destroy(props);
+                       printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot register 
ServiceFactory for topic %s, topic %s (bundle %ld).\n",pubEP->scope, 
pubEP->topic,pubEP->serviceID);
+               }
+               else{
+                       *svcFactory = factory;
+               }
+       }
+       else{
+               printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint 
after adding it...Should never happen!\n");
+               status = CELIX_SERVICE_EXCEPTION;
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
+       return serviceRegistration_unregister(pub->svcFactoryReg);
+}
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
+
+       celixThreadMutex_lock(&(pub->tp_lock));
+       ep->endpoint = strdup(pub->endpoint);
+       arrayList_add(pub->pub_ep_list,ep);
+       celixThreadMutex_unlock(&(pub->tp_lock));
+
+       return CELIX_SUCCESS;
+}
+
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
+
+       celixThreadMutex_lock(&(pub->tp_lock));
+       arrayList_removeElement(pub->pub_ep_list,ep);
+       celixThreadMutex_unlock(&(pub->tp_lock));
+
+       return CELIX_SUCCESS;
+}
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
+       array_list_pt list = NULL;
+       celixThreadMutex_lock(&(pub->tp_lock));
+       list = arrayList_clone(pub->pub_ep_list);
+       celixThreadMutex_unlock(&(pub->tp_lock));
+       return list;
+}
+
+
+static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service) {
+       celix_status_t  status = CELIX_SUCCESS;
+
+       topic_publication_pt publish = (topic_publication_pt)handle;
+
+       celixThreadMutex_lock(&(publish->tp_lock));
+
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       if(bound==NULL){
+               bound = pubsub_createPublishBundleBoundService(publish,bundle);
+               if(bound!=NULL){
+                       hashMap_put(publish->boundServices,bundle,bound);
+               }
+       }
+       else{
+               bound->getCount++;
+       }
+
+       if (bound != NULL) {
+               *service = &bound->service;
+       }
+
+       celixThreadMutex_unlock(&(publish->tp_lock));
+
+       return status;
+}
+
+static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service)  {
+
+       topic_publication_pt publish = (topic_publication_pt)handle;
+
+       celixThreadMutex_lock(&(publish->tp_lock));
+
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
+       if(bound!=NULL){
+
+               bound->getCount--;
+               if(bound->getCount==0){
+                       pubsub_destroyPublishBundleBoundService(bound);
+                       hashMap_remove(publish->boundServices,bundle);
+               }
+
+       }
+       else{
+               long bundleId = -1;
+               bundle_getBundleId(bundle,&bundleId);
+               printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
+       }
+
+       /* service should be never used for unget, so let's set the pointer to 
NULL */
+       *service = NULL;
+
+       celixThreadMutex_unlock(&(publish->tp_lock));
+
+       return CELIX_SUCCESS;
+}
+
+static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
+       const int iovec_len = 3; // header + size + payload
+       bool ret = true;
+
+       struct iovec msg_iovec[iovec_len];
+       msg_iovec[0].iov_base = msg->header;
+       msg_iovec[0].iov_len = sizeof(*msg->header);
+       msg_iovec[1].iov_base = &msg->payloadSize;
+       msg_iovec[1].iov_len = sizeof(msg->payloadSize);
+       msg_iovec[2].iov_base = msg->payload;
+       msg_iovec[2].iov_len = msg->payloadSize;
+
+       delay_first_send_for_late_joiners();
+
+       if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, 
msg_iovec, iovec_len, 0, &bound->parent->destAddr, 
sizeof(bound->parent->destAddr)) == -1) {
+               perror("send_pubsub_msg:sendSocket");
+               ret = false;
+       }
+
+       if(releaseCallback) {
+               releaseCallback->release(msg->payload, bound);
+       }
+       return ret;
+
+}
+
+
+static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *inMsg) {
+       int status = 0;
+       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
+
+       celixThreadMutex_lock(&(bound->parent->tp_lock));
+       celixThreadMutex_lock(&(bound->mp_lock));
+
+       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
+
+       if (msgSer != NULL) {
+               int major=0, minor=0;
+
+               pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
+               strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
+               msg_hdr->type = msgTypeId;
+
+
+               if (msgSer->msgVersion != NULL){
+                       version_getMajor(msgSer->msgVersion, &major);
+                       version_getMinor(msgSer->msgVersion, &minor);
+                       msg_hdr->major = major;
+                       msg_hdr->minor = minor;
+               }
+
+               void* serializedOutput = NULL;
+               size_t serializedOutputLen = 0;
+               msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
+
+               pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
+               msg->header = msg_hdr;
+               msg->payload = (char*)serializedOutput;
+               msg->payloadSize = serializedOutputLen;
+
+
+               if(send_pubsub_msg(bound, msg,true, NULL) == false) {
+                       status = -1;
+               }
+               free(msg_hdr);
+               free(msg);
+               free(serializedOutput);
+
+
+       } else {
+               printf("PSA_UDP_MC_TP: No msg serializer available for msg type 
id %d\n", msgTypeId);
+               status=-1;
+       }
+
+       celixThreadMutex_unlock(&(bound->mp_lock));
+       celixThreadMutex_unlock(&(bound->parent->tp_lock));
+
+       return status;
+}
+
+static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
+}
+
+
+static unsigned int rand_range(unsigned int min, unsigned int max){
+
+       double scaled = (double)(((double)random())/((double)RAND_MAX));
+       return (max-min+1)*scaled + min;
+
+}
+
+static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
+
+       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
+
+       if (bound != NULL) {
+
+               bound->parent = tp;
+               bound->bundle = bundle;
+               bound->getCount = 1;
+               celixThreadMutex_create(&bound->mp_lock,NULL);
+
+               if(tp->serializer != NULL){
+                       
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
+               }
+
+               pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
+               bound->scope=strdup(pubEP->scope);
+               bound->topic=strdup(pubEP->topic);
+               bound->largeUdpHandle = largeUdp_create(1);
+
+               bound->service.handle = bound;
+               bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
+               bound->service.send = pubsub_topicPublicationSend;
+               bound->service.sendMultipart = NULL;  //Multipart not supported 
for UDP
+
+       }
+
+       return bound;
+}
+
+static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
+
+       celixThreadMutex_lock(&boundSvc->mp_lock);
+
+       if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
+               
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
+       }
+
+       if(boundSvc->scope!=NULL){
+               free(boundSvc->scope);
+       }
+
+       if(boundSvc->topic!=NULL){
+               free(boundSvc->topic);
+       }
+
+       largeUdp_destroy(boundSvc->largeUdpHandle);
+
+       celixThreadMutex_unlock(&boundSvc->mp_lock);
+       celixThreadMutex_destroy(&boundSvc->mp_lock);
+
+       free(boundSvc);
+
+}
+
+static void delay_first_send_for_late_joiners(){
+
+       static bool firstSend = true;
+
+       if(firstSend){
+               printf("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
+               sleep(FIRST_SEND_DELAY);
+               firstSend = false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_publication.h 
b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
new file mode 100644
index 0000000..4363d71
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
@@ -0,0 +1,57 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_publication.h
+ *
+ *  \date       Sep 24, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_PUBLICATION_H_
+#define TOPIC_PUBLICATION_H_
+
+#include "publisher.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+
+#include "pubsub_serializer.h"
+
+#define UDP_BASE_PORT  49152
+#define UDP_MAX_PORT   65000
+
+typedef struct pubsub_udp_msg {
+    struct pubsub_msg_header header;
+    unsigned int payloadSize;
+    char payload[];
+} pubsub_udp_msg_t;
+
+typedef struct topic_publication *topic_publication_pt;
+celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* 
bindIP, topic_publication_pt *out);
+celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
+
+celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep);
+
+celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
+celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
+
+array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
+
+#endif /* TOPIC_PUBLICATION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c 
b/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
new file mode 100644
index 0000000..d8e6f45
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_subscription.c
@@ -0,0 +1,635 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.c
+ *
+ *  \date       Oct 2, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "utils.h"
+#include "celix_errno.h"
+#include "constants.h"
+#include "version.h"
+
+#include "topic_subscription.h"
+#include "topic_publication.h"
+#include "subscriber.h"
+#include "publisher.h"
+#include "large_udp.h"
+
+#include "pubsub_serializer.h"
+
+#define MAX_EPOLL_EVENTS        10
+#define RECV_THREAD_TIMEOUT     5
+#define UDP_BUFFER_SIZE         65535
+#define MAX_UDP_SESSIONS        16
+
+struct topic_subscription{
+       char* ifIpAddress;
+       service_tracker_pt tracker;
+       array_list_pt sub_ep_list;
+       celix_thread_t recv_thread;
+       bool running;
+       celix_thread_mutex_t ts_lock;
+       bundle_context_pt context;
+
+       pubsub_serializer_service_t *serializer;
+
+       int topicEpollFd; // EPOLL filedescriptor where the sockets are 
registered.
+       hash_map_pt servicesMap; // key = service, value = msg types map
+       hash_map_pt socketMap; // key = URL, value = listen-socket
+       celix_thread_mutex_t socketMap_lock;
+
+       celix_thread_mutex_t pendingConnections_lock;
+       array_list_pt pendingConnections;
+
+       array_list_pt pendingDisconnections;
+       celix_thread_mutex_t pendingDisconnections_lock;
+
+       //array_list_pt rawServices;
+       unsigned int nrSubscribers;
+       largeUdp_pt largeUdpHandle;
+};
+
+typedef struct msg_map_entry{
+       bool retain;
+       void* msgInst;
+}* msg_map_entry_pt;
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service);
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service);
+static void* udp_recv_thread_func(void* arg);
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr);
+static void sigusr1_sighandler(int signo);
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId);
+static void connectPendingPublishers(topic_subscription_pt sub);
+static void disconnectPendingPublishers(topic_subscription_pt sub);
+
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out){
+       celix_status_t status = CELIX_SUCCESS;
+
+       topic_subscription_pt ts = (topic_subscription_pt) 
calloc(1,sizeof(*ts));
+       ts->context = bundle_context;
+       ts->ifIpAddress = strdup(ifIp);
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: Use kqueue for OSX
+#else
+       ts->topicEpollFd = epoll_create1(0);
+#endif
+       if(ts->topicEpollFd == -1) {
+               status += CELIX_SERVICE_EXCEPTION;
+       }
+
+       ts->running = false;
+       ts->nrSubscribers = 0;
+       ts->serializer = best_serializer;
+
+       celixThreadMutex_create(&ts->ts_lock,NULL);
+       arrayList_create(&ts->sub_ep_list);
+       ts->servicesMap = hashMap_create(NULL, NULL, NULL, NULL);
+       ts->socketMap =  hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+
+       arrayList_create(&ts->pendingConnections);
+       arrayList_create(&ts->pendingDisconnections);
+       celixThreadMutex_create(&ts->pendingConnections_lock, NULL);
+       celixThreadMutex_create(&ts->pendingDisconnections_lock, NULL);
+       celixThreadMutex_create(&ts->socketMap_lock, NULL);
+
+       ts->largeUdpHandle = largeUdp_create(MAX_UDP_SESSIONS);
+
+       char filter[128];
+       memset(filter,0,128);
+       if(strncmp(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, scope, 
strlen(PUBSUB_SUBSCRIBER_SCOPE_DEFAULT)) == 0) {
+               // default scope, means that subscriber has not defined a scope 
property
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic);
+
+       } else {
+               snprintf(filter, 128, "(&(%s=%s)(%s=%s)(%s=%s))",
+                               (char*) OSGI_FRAMEWORK_OBJECTCLASS, 
PUBSUB_SUBSCRIBER_SERVICE_NAME,
+                               PUBSUB_SUBSCRIBER_TOPIC,topic,
+                               PUBSUB_SUBSCRIBER_SCOPE,scope);
+       }
+
+       service_tracker_customizer_pt customizer = NULL;
+       status += 
serviceTrackerCustomizer_create(ts,NULL,topicsub_subscriberTracked,NULL,topicsub_subscriberUntracked,&customizer);
+       status += serviceTracker_createWithFilter(bundle_context, filter, 
customizer, &ts->tracker);
+
+       struct sigaction actions;
+       memset(&actions, 0, sizeof(actions));
+       sigemptyset(&actions.sa_mask);
+       actions.sa_flags = 0;
+       actions.sa_handler = sigusr1_sighandler;
+
+       sigaction(SIGUSR1,&actions,NULL);
+
+       if (status == CELIX_SUCCESS) {
+               *out=ts;
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->running = false;
+       free(ts->ifIpAddress);
+       serviceTracker_destroy(ts->tracker);
+       arrayList_clear(ts->sub_ep_list);
+       arrayList_destroy(ts->sub_ep_list);
+       hashMap_destroy(ts->servicesMap,false,false);
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+       hashMap_destroy(ts->socketMap,true,true);
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+       celixThreadMutex_destroy(&ts->socketMap_lock);
+
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_destroy(ts->pendingConnections);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       celixThreadMutex_destroy(&ts->pendingConnections_lock);
+
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_destroy(ts->pendingDisconnections);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       celixThreadMutex_destroy(&ts->pendingDisconnections_lock);
+
+       largeUdp_destroy(ts->largeUdpHandle);
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: Use kqueue for OSX
+#else
+       close(ts->topicEpollFd);
+#endif
+
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       celixThreadMutex_destroy(&ts->ts_lock);
+
+       free(ts);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+
+       status = serviceTracker_open(ts->tracker);
+
+       ts->running = true;
+
+       if(status==CELIX_SUCCESS){
+               
status=celixThread_create(&ts->recv_thread,NULL,udp_recv_thread_func,ts);
+       }
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts){
+       celix_status_t status = CELIX_SUCCESS;
+       struct epoll_event ev;
+       memset(&ev, 0, sizeof(ev));
+
+       ts->running = false;
+
+       pthread_kill(ts->recv_thread.thread,SIGUSR1);
+
+       celixThread_join(ts->recv_thread,NULL);
+
+       status = serviceTracker_close(ts->tracker);
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+       hash_map_iterator_pt it = hashMapIterator_create(ts->socketMap);
+       while(hashMapIterator_hasNext(it)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(it);
+               char *url = hashMapEntry_getKey(entry);
+               int *s = hashMapEntry_getValue(entry);
+               memset(&ev, 0, sizeof(ev));
+               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+                       printf("in if error()\n");
+                       perror("epoll_ctl() EPOLL_CTL_DEL");
+                       status += CELIX_SERVICE_EXCEPTION;
+               }
+               free(s);
+               free(url);
+               //hashMapIterator_remove(it);
+       }
+       hashMapIterator_destroy(it);
+       hashMap_clear(ts->socketMap, false, false);
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL) {
+
+       printf("pubsub_topicSubscriptionConnectPublisher : pubURL = %s\n", 
pubURL);
+
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+
+       if(!hashMap_containsKey(ts->socketMap, pubURL)){
+
+               int *recvSocket = calloc(sizeof(int), 1);
+               *recvSocket = socket(AF_INET, SOCK_DGRAM, 0);
+               if (*recvSocket < 0) {
+                       perror("pubsub_topicSubscriptionCreate:socket");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+
+               if (status == CELIX_SUCCESS){
+                       int reuse = 1;
+                       if (setsockopt(*recvSocket, SOL_SOCKET, SO_REUSEADDR, 
(char*) &reuse, sizeof(reuse)) != 0) {
+                               perror("setsockopt() SO_REUSEADDR");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+               }
+
+               if(status == CELIX_SUCCESS){
+                       // TODO Check if there is a better way to parse the URL 
to IP/Portnr
+                       //replace ':' by spaces
+                       char *url = strdup(pubURL);
+                       char *pt = url;
+                       while((pt=strchr(pt, ':')) != NULL) {
+                               *pt = ' ';
+                       }
+                       char mcIp[100];
+                       unsigned short mcPort;
+                       sscanf(url, "udp //%s %hu", mcIp, &mcPort);
+                       free(url);
+
+                       printf("pubsub_topicSubscriptionConnectPublisher : IP = 
%s, Port = %hu\n", mcIp, mcPort);
+
+                       struct ip_mreq mc_addr;
+                       mc_addr.imr_multiaddr.s_addr = inet_addr(mcIp);
+                       mc_addr.imr_interface.s_addr = 
inet_addr(ts->ifIpAddress);
+                       printf("Adding MC %s at interface %s\n", mcIp, 
ts->ifIpAddress);
+                       if (setsockopt(*recvSocket, IPPROTO_IP, 
IP_ADD_MEMBERSHIP, (char*) &mc_addr, sizeof(mc_addr)) != 0) {
+                               perror("setsockopt() IP_ADD_MEMBERSHIP");
+                               status = CELIX_SERVICE_EXCEPTION;
+                       }
+
+                       if (status == CELIX_SUCCESS){
+                               struct sockaddr_in mcListenAddr;
+                               mcListenAddr.sin_family = AF_INET;
+                               mcListenAddr.sin_addr.s_addr = INADDR_ANY;
+                               mcListenAddr.sin_port = htons(mcPort);
+                               if(bind(*recvSocket, (struct 
sockaddr*)&mcListenAddr, sizeof(mcListenAddr)) != 0) {
+                                       perror("bind()");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
+                       }
+
+                       if (status == CELIX_SUCCESS){
+#if defined(__APPLE__) && defined(__MACH__)
+                               //TODO: Use kqueue for OSX
+#else
+                               struct epoll_event ev;
+                               memset(&ev, 0, sizeof(ev));
+                               ev.events = EPOLLIN;
+                               ev.data.fd = *recvSocket;
+                               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_ADD, 
*recvSocket, &ev) == -1) {
+                                       perror("epoll_ctl() EPOLL_CTL_ADD");
+                                       status = CELIX_SERVICE_EXCEPTION;
+                               }
+#endif
+                       }
+
+               }
+
+               if (status == CELIX_SUCCESS){
+                       hashMap_put(ts->socketMap, strdup(pubURL), 
(void*)recvSocket);
+               }
+               else{
+                       free(recvSocket);
+               }
+       }
+
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingConnections_lock);
+       arrayList_add(ts->pendingConnections, url);
+       celixThreadMutex_unlock(&ts->pendingConnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL) {
+       celix_status_t status = CELIX_SUCCESS;
+       char *url = strdup(pubURL);
+       celixThreadMutex_lock(&ts->pendingDisconnections_lock);
+       arrayList_add(ts->pendingDisconnections, url);
+       celixThreadMutex_unlock(&ts->pendingDisconnections_lock);
+       return status;
+}
+
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL){
+       printf("pubsub_topicSubscriptionDisconnectPublisher : pubURL = %s\n", 
pubURL);
+       celix_status_t status = CELIX_SUCCESS;
+       struct epoll_event ev;
+       memset(&ev, 0, sizeof(ev));
+
+       celixThreadMutex_lock(&ts->socketMap_lock);
+
+       if (hashMap_containsKey(ts->socketMap, pubURL)){
+
+#if defined(__APPLE__) && defined(__MACH__)
+               //TODO: Use kqueue for OSX
+#else
+               int *s = hashMap_remove(ts->socketMap, pubURL);
+               if(epoll_ctl(ts->topicEpollFd, EPOLL_CTL_DEL, *s, &ev) == -1) {
+                       printf("in if error()\n");
+                       perror("epoll_ctl() EPOLL_CTL_DEL");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+               free(s);
+#endif
+
+       }
+
+       celixThreadMutex_unlock(&ts->socketMap_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_add(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+
+}
+
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers++;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP){
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       arrayList_removeElement(ts->sub_ep_list,subEP);
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt ts) {
+       celix_status_t status = CELIX_SUCCESS;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       ts->nrSubscribers--;
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+}
+
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt ts) {
+       return ts->nrSubscribers;
+}
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub){
+       return sub->sub_ep_list;
+}
+
+
+static celix_status_t topicsub_subscriberTracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (!hashMap_containsKey(ts->servicesMap, service)) {
+               bundle_pt bundle = NULL;
+               hash_map_pt msgTypes = NULL;
+
+               serviceReference_getBundle(reference, &bundle);
+
+               if(ts->serializer != NULL && bundle!=NULL){
+                       
ts->serializer->createSerializerMap(ts->serializer->handle,bundle,&msgTypes);
+                       if(msgTypes != NULL){
+                               hashMap_put(ts->servicesMap, service, msgTypes);
+                               printf("PSA_UDP_MC_TS: New subscriber 
registered.\n");
+                       }
+               }
+               else{
+                       printf("PSA_UDP_MC_TS: Cannot register new 
subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       return status;
+
+}
+
+static celix_status_t topicsub_subscriberUntracked(void * handle, 
service_reference_pt reference, void * service){
+       celix_status_t status = CELIX_SUCCESS;
+       topic_subscription_pt ts = handle;
+
+       celixThreadMutex_lock(&ts->ts_lock);
+       if (hashMap_containsKey(ts->servicesMap, service)) {
+               hash_map_pt msgTypes = hashMap_remove(ts->servicesMap, service);
+               if(msgTypes!=NULL && ts->serializer!=NULL){
+                       
ts->serializer->destroySerializerMap(ts->serializer->handle,msgTypes);
+                       printf("PSA_ZMQ_TS: Subscriber unregistered.\n");
+               }
+               else{
+                       printf("PSA_ZMQ_TS: Cannot unregister subscriber.\n");
+                       status = CELIX_SERVICE_EXCEPTION;
+               }
+       }
+       celixThreadMutex_unlock(&ts->ts_lock);
+
+       printf("PSA_UDP_MC_TS: Subscriber unregistered.\n");
+       return status;
+}
+
+
+static void process_msg(topic_subscription_pt sub,pubsub_udp_msg_t *msg){
+
+       celixThreadMutex_lock(&sub->ts_lock);
+       hash_map_iterator_pt iter = hashMapIterator_create(sub->servicesMap);
+       while (hashMapIterator_hasNext(iter)) {
+               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
+               pubsub_subscriber_pt subsvc = hashMapEntry_getKey(entry);
+               hash_map_pt msgTypes = hashMapEntry_getValue(entry);
+
+               pubsub_msg_serializer_t *msgSer = 
hashMap_get(msgTypes,(void*)(uintptr_t )msg->header.type);
+               if (msgSer == NULL) {
+                       printf("PSA_UDP_MC_TS: Serializer not available for 
message %d.\n",msg->header.type);
+               }
+               else{
+                       void *msgInst = NULL;
+                       bool validVersion = 
checkVersion(msgSer->msgVersion,&msg->header);
+
+                       if(validVersion){
+
+                               celix_status_t status = 
msgSer->deserialize(msgSer, (const void *) msg->payload, 0, &msgInst);
+
+                               if (status == CELIX_SUCCESS) {
+                                       bool release = true;
+                                       pubsub_multipart_callbacks_t 
mp_callbacks;
+                                       mp_callbacks.handle = sub;
+                                       mp_callbacks.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForMsgType;
+                                       mp_callbacks.getMultipart = NULL;
+
+                                       subsvc->receive(subsvc->handle, 
msgSer->msgName, msg->header.type, msgInst, &mp_callbacks, &release);
+
+                                       if(release){
+                                               msgSer->freeMsg(msgSer,msgInst);
+                                       }
+                               }
+                               else{
+                                       printf("PSA_UDP_MC_TS: Cannot 
deserialize msgType %s.\n",msgSer->msgName);
+                               }
+
+                       }
+                       else{
+                               int major=0,minor=0;
+                               version_getMajor(msgSer->msgVersion,&major);
+                               version_getMinor(msgSer->msgVersion,&minor);
+                               printf("PSA_UDP_MC_TS: Version mismatch for 
primary message '%s' (have %d.%d, received %u.%u). NOT sending any part of the 
whole message.\n",
+                                               
msgSer->msgName,major,minor,msg->header.major,msg->header.minor);
+                       }
+
+               }
+       }
+       hashMapIterator_destroy(iter);
+       celixThreadMutex_unlock(&sub->ts_lock);
+}
+
+static void* udp_recv_thread_func(void * arg) {
+       topic_subscription_pt sub = (topic_subscription_pt) arg;
+
+#if defined(__APPLE__) && defined(__MACH__)
+       //TODO: use kqueue for OSX
+       //struct kevent events[MAX_EPOLL_EVENTS];
+       while (sub->running) {
+               int nfds = 0;
+               if(nfds > 0) {
+                       pubsub_udp_msg_t* udpMsg = NULL;
+                       process_msg(sub, udpMsg);
+               }
+       }
+#else
+       struct epoll_event events[MAX_EPOLL_EVENTS];
+
+       while (sub->running) {
+               int nfds = epoll_wait(sub->topicEpollFd, events, 
MAX_EPOLL_EVENTS, RECV_THREAD_TIMEOUT * 1000);
+               int i;
+               for(i = 0; i < nfds; i++ ) {
+                       unsigned int index;
+                       unsigned int size;
+                       if(largeUdp_dataAvailable(sub->largeUdpHandle, 
events[i].data.fd, &index, &size) == true) {
+                               // Handle data
+                               pubsub_udp_msg_t *udpMsg = NULL;
+                               if(largeUdp_read(sub->largeUdpHandle, index, 
(void**)&udpMsg, size) != 0) {
+                                       printf("PSA_UDP_MC_TS: ERROR 
largeUdp_read with index %d\n", index);
+                                       continue;
+                               }
+
+                               process_msg(sub, udpMsg);
+
+                               free(udpMsg);
+                       }
+               }
+               connectPendingPublishers(sub);
+               disconnectPendingPublishers(sub);
+       }
+#endif
+
+       return NULL;
+}
+
+static void connectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingConnections_lock);
+       while(!arrayList_isEmpty(sub->pendingConnections)) {
+               char * pubEP = arrayList_remove(sub->pendingConnections, 0);
+               pubsub_topicSubscriptionConnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingConnections_lock);
+}
+
+static void disconnectPendingPublishers(topic_subscription_pt sub) {
+       celixThreadMutex_lock(&sub->pendingDisconnections_lock);
+       while(!arrayList_isEmpty(sub->pendingDisconnections)) {
+               char * pubEP = arrayList_remove(sub->pendingDisconnections, 0);
+               pubsub_topicSubscriptionDisconnectPublisher(sub, pubEP);
+               free(pubEP);
+       }
+       celixThreadMutex_unlock(&sub->pendingDisconnections_lock);
+}
+
+static void sigusr1_sighandler(int signo){
+       printf("PSA_UDP_MC_TS: Topic subscription being shut down...\n");
+       return;
+}
+
+static bool checkVersion(version_pt msgVersion,pubsub_msg_header_pt hdr){
+       bool check=false;
+       int major=0,minor=0;
+
+       if(msgVersion!=NULL){
+               version_getMajor(msgVersion,&major);
+               version_getMinor(msgVersion,&minor);
+               if(hdr->major==((unsigned char)major)){ /* Different major 
means incompatible */
+                       check = (hdr->minor>=((unsigned char)minor)); /* 
Compatible only if the provider has a minor equals or greater (means compatible 
update) */
+               }
+       }
+
+       return check;
+}
+
+static int pubsub_localMsgTypeIdForMsgType(void* handle, const char* msgType, 
unsigned int* msgTypeId){
+       *msgTypeId = utils_stringHash(msgType);
+       return 0;
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h 
b/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
new file mode 100644
index 0000000..475416a
--- /dev/null
+++ b/pubsub/pubsub_admin_udp_mc/src/topic_subscription.h
@@ -0,0 +1,60 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * topic_subscription.h
+ *
+ *  \date       Sep 22, 2015
+ *  \author            <a href="mailto:[email protected]";>Apache Celix 
Project Team</a>
+ *  \copyright Apache License, Version 2.0
+ */
+
+#ifndef TOPIC_SUBSCRIPTION_H_
+#define TOPIC_SUBSCRIPTION_H_
+
+#include "celix_threads.h"
+#include "array_list.h"
+#include "celixbool.h"
+#include "service_tracker.h"
+
+#include "pubsub_endpoint.h"
+#include "pubsub_common.h"
+#include "pubsub_serializer.h"
+
+typedef struct topic_subscription* topic_subscription_pt;
+
+celix_status_t pubsub_topicSubscriptionCreate(bundle_context_pt 
bundle_context, char* ifIp,char* scope, char* topic 
,pubsub_serializer_service_t *best_serializer, topic_subscription_pt* out);
+celix_status_t pubsub_topicSubscriptionDestroy(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStart(topic_subscription_pt ts);
+celix_status_t pubsub_topicSubscriptionStop(topic_subscription_pt ts);
+
+celix_status_t 
pubsub_topicSubscriptionAddConnectPublisherToPendingList(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(topic_subscription_pt
 ts, char* pubURL);
+
+celix_status_t pubsub_topicSubscriptionConnectPublisher(topic_subscription_pt 
ts, char* pubURL);
+celix_status_t 
pubsub_topicSubscriptionDisconnectPublisher(topic_subscription_pt ts, char* 
pubURL);
+
+celix_status_t pubsub_topicSubscriptionAddSubscriber(topic_subscription_pt ts, 
pubsub_endpoint_pt subEP);
+celix_status_t pubsub_topicSubscriptionRemoveSubscriber(topic_subscription_pt 
ts, pubsub_endpoint_pt subEP);
+
+array_list_pt pubsub_topicSubscriptionGetSubscribersList(topic_subscription_pt 
sub);
+celix_status_t pubsub_topicIncreaseNrSubscribers(topic_subscription_pt 
subscription);
+celix_status_t pubsub_topicDecreaseNrSubscribers(topic_subscription_pt 
subscription);
+unsigned int pubsub_topicGetNrSubscribers(topic_subscription_pt subscription);
+
+#endif /*TOPIC_SUBSCRIPTION_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/2a670f26/pubsub/pubsub_admin_zmq/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/pubsub/pubsub_admin_zmq/CMakeLists.txt 
b/pubsub/pubsub_admin_zmq/CMakeLists.txt
index ab250f9..d71aedb 100644
--- a/pubsub/pubsub_admin_zmq/CMakeLists.txt
+++ b/pubsub/pubsub_admin_zmq/CMakeLists.txt
@@ -50,14 +50,15 @@ if (BUILD_PUBSUB_PSA_ZMQ)
                private/src/topic_subscription.c
                private/src/topic_publication.c
                ${ZMQ_CRYPTO_C}
-               ${PROJECT_SOURCE_DIR}/log_service/public/src/log_helper.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_endpoint.c
                
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_utils.c
           
${PROJECT_SOURCE_DIR}/pubsub/pubsub_common/public/src/pubsub_admin_match.c
        )
 
        set_target_properties(org.apache.celix.pubsub_admin.PubSubAdminZmq 
PROPERTIES INSTALL_RPATH "$ORIGIN")
-       target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq 
celix_framework celix_utils celix_dfi ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} 
${OPENSSL_CRYPTO_LIBRARY})
+       target_link_libraries(org.apache.celix.pubsub_admin.PubSubAdminZmq 
PRIVATE
+                       Celix::framework Celix::dfi Celix::log_helper
+                       ${ZMQ_LIBRARIES} ${CZMQ_LIBRARIES} 
${OPENSSL_CRYPTO_LIBRARY})
        install_bundle(org.apache.celix.pubsub_admin.PubSubAdminZmq)
 
 endif()

Reply via email to