CELIX-454: Removes some now unused sources

Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/ffb97ffe
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/ffb97ffe
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/ffb97ffe

Branch: refs/heads/feature/CELIX-454-pubsub-disc
Commit: ffb97ffea85a3f0066ec04bf995863878fa9d21e
Parents: 2eb219e
Author: Pepijn Noltes <pepijnnol...@gmail.com>
Authored: Wed Sep 26 21:15:07 2018 +0200
Committer: Pepijn Noltes <pepijnnol...@gmail.com>
Committed: Wed Sep 26 21:15:07 2018 +0200

----------------------------------------------------------------------
 .../pubsub/pubsub_admin_udp_mc/CMakeLists.txt   |    3 -
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.c | 1221 ------------------
 .../pubsub_admin_udp_mc/src/pubsub_admin_impl.h |  111 --
 .../pubsub_admin_udp_mc/src/topic_publication.c |  453 -------
 .../pubsub_admin_udp_mc/src/topic_publication.h |   57 -
 .../src/topic_subscription.c                    |  635 ---------
 .../src/topic_subscription.h                    |   60 -
 7 files changed, 2540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/ffb97ffe/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt 
b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
index 3f74376..0703ceb 100644
--- a/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_udp_mc/CMakeLists.txt
@@ -27,9 +27,6 @@ add_celix_bundle(celix_pubsub_admin_udp_multicast
                src/pubsub_udpmc_topic_sender.c
                src/pubsub_udpmc_topic_receiver.c
                src/pubsub_udpmc_common.c
-        #src/pubsub_admin_impl.c
-        #src/topic_subscription.c
-        #src/topic_publication.c
         src/large_udp.c
 )
 target_include_directories(celix_pubsub_admin_udp_multicast PRIVATE

http://git-wip-us.apache.org/repos/asf/celix/blob/ffb97ffe/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
deleted file mode 100644
index 2f62f6f..0000000
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.c
+++ /dev/null
@@ -1,1221 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-
-#ifndef ANDROID
-#include <ifaddrs.h>
-#endif
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <assert.h>
-
-#include "constants.h"
-#include "utils.h"
-#include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
-#include "bundle.h"
-#include "service_reference.h"
-#include "service_registration.h"
-#include "log_helper.h"
-#include "log_service.h"
-#include "celix_threads.h"
-#include "service_factory.h"
-
-#include "pubsub_admin_impl.h"
-#include "topic_subscription.h"
-#include "topic_publication.h"
-#include "pubsub_endpoint.h"
-#include "pubsub/subscriber.h"
-#include "pubsub_utils.h"
-
-static const char *DEFAULT_MC_IP = "224.100.1.1";
-static char *DEFAULT_MC_PREFIX = "224.100";
-
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip);
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t 
*admin,celix_properties_t *subEP);
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t 
*admin,celix_properties_t *subEP);
-
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, 
celix_properties_t *ep, pubsub_serializer_service_t **out, const char 
**serType);
-static void connectTopicPubSubToSerializer(pubsub_admin_t 
*admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void 
*topicPubSub,bool isPublication);
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t 
**admin) {
-       celix_status_t status = CELIX_SUCCESS;
-
-       *admin = calloc(1, sizeof(**admin));
-
-       if (!*admin) {
-               return CELIX_ENOMEM;
-       }
-
-       char *mc_ip = NULL;
-       char *if_ip = NULL;
-       int sendSocket = -1;
-
-       if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) {
-               logHelper_start((*admin)->loghelper);
-       }
-       const char *mc_ip_prop = NULL;
-       bundleContext_getProperty(context,PSA_IP , &mc_ip_prop);
-       if(mc_ip_prop) {
-               mc_ip = strdup(mc_ip_prop);
-       }
-
-#ifndef ANDROID
-       if (mc_ip == NULL) {
-               const char *mc_prefix = NULL;
-               const char *interface = NULL;
-               int b0 = 0, b1 = 0, b2 = 0, b3 = 0;
-               bundleContext_getProperty(context,PSA_MULTICAST_IP_PREFIX , 
&mc_prefix);
-               if(mc_prefix == NULL) {
-                       mc_prefix = DEFAULT_MC_PREFIX;
-               }
-
-               bundleContext_getProperty(context, PSA_ITF, &interface);
-               if (pubsubAdmin_getIpAddress(interface, &if_ip) != 
CELIX_SUCCESS) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not retrieve IP address for 
interface %s", interface);
-               }
-
-               printf("IP Detected : %s\n", if_ip);
-               if(if_ip && sscanf(if_ip, "%i.%i.%i.%i", &b0, &b1, &b2, &b3) != 
4) {
-                       logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_UDP_MC: Could not parse IP address %s", if_ip);
-                       b2 = 1;
-                       b3 = 1;
-               }
-
-               asprintf(&mc_ip, "%s.%d.%d",mc_prefix, b2, b3);
-
-               sendSocket = socket(AF_INET, SOCK_DGRAM, 0);
-               if(sendSocket == -1) {
-                       perror("pubsubAdmin_create:socket");
-                       status = CELIX_SERVICE_EXCEPTION;
-               }
-
-               if(status == CELIX_SUCCESS){
-                       char loop = 1;
-                       if(setsockopt(sendSocket, IPPROTO_IP, 
IP_MULTICAST_LOOP, &loop, sizeof(loop)) != 0) {
-                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_LOOP)");
-                               status = CELIX_SERVICE_EXCEPTION;
-                       }
-               }
-
-               if(status == CELIX_SUCCESS){
-                       struct in_addr multicast_interface;
-                       inet_aton(if_ip, &multicast_interface);
-                       if(setsockopt(sendSocket,  IPPROTO_IP, IP_MULTICAST_IF, 
&multicast_interface, sizeof(multicast_interface)) != 0) {
-                               
perror("pubsubAdmin_create:setsockopt(IP_MULTICAST_IF)");
-                               status = CELIX_SERVICE_EXCEPTION;
-                       }
-               }
-
-       }
-
-
-       if(status != CELIX_SUCCESS){
-               logHelper_stop((*admin)->loghelper);
-               logHelper_destroy(&((*admin)->loghelper));
-               if(sendSocket >=0){
-                       close(sendSocket);
-               }
-               if(if_ip != NULL){
-                       free(if_ip);
-               }
-               if(mc_ip != NULL){
-                       free(mc_ip);
-               }
-               return status;
-       }
-       else{
-               (*admin)->sendSocket = sendSocket;
-       }
-
-#endif
-
-       (*admin)->bundle_context= context;
-       (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, 
NULL, NULL);
-       (*admin)->topicPublicationsPerSerializer  = hashMap_create(NULL, NULL, 
NULL, NULL);
-       arrayList_create(&((*admin)->noSerializerSubscriptions));
-       arrayList_create(&((*admin)->noSerializerPublications));
-       arrayList_create(&((*admin)->serializerList));
-
-       celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
-       celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
-       celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL);
-       celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
-       celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
-
-       celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
-       celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
-       celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
&(*admin)->noSerializerPendingsAttr);
-
-       celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
-       celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
-       celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
-
-       if (if_ip != NULL) {
-               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, 
"PSA_UDP_MC: Using %s as interface for multicast communication", if_ip);
-               (*admin)->ifIpAddress = if_ip;
-       } else {
-               (*admin)->ifIpAddress = strdup("127.0.0.1");
-       }
-
-       if (mc_ip != NULL) {
-               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, 
"PSA_UDP_MC: Using %s for service annunciation", mc_ip);
-               (*admin)->mcIpAddress = mc_ip;
-       }
-       else {
-               logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, 
"PSA_UDP_MC: No IP address for service annunciation set. Using %s", 
DEFAULT_MC_IP);
-               (*admin)->mcIpAddress = strdup(DEFAULT_MC_IP);
-       }
-
-       (*admin)->defaultScore = PSA_UDPMC_DEFAULT_SCORE;
-       (*admin)->qosSampleScore = PSA_UDPMC_DEFAULT_QOS_SAMPLE_SCORE;
-       (*admin)->qosControlScore = PSA_UDPMC_DEFAULT_QOS_CONTROL_SCORE;
-
-       const char *defaultScoreStr = NULL;
-       const char *sampleScoreStr = NULL;
-       const char *controlScoreStr = NULL;
-       bundleContext_getProperty(context, PSA_UDPMC_DEFAULT_SCORE_KEY, 
&defaultScoreStr);
-       bundleContext_getProperty(context, PSA_UDPMC_QOS_SAMPLE_SCORE_KEY, 
&sampleScoreStr);
-       bundleContext_getProperty(context, PSA_UDPMC_QOS_CONTROL_SCORE_KEY, 
&controlScoreStr);
-
-       if (defaultScoreStr != NULL) {
-               (*admin)->defaultScore = strtof(defaultScoreStr, NULL);
-       }
-       if (sampleScoreStr != NULL) {
-               (*admin)->qosSampleScore = strtof(sampleScoreStr, NULL);
-       }
-       if (controlScoreStr != NULL) {
-               (*admin)->qosControlScore = strtof(controlScoreStr, NULL);
-       }
-
-       (*admin)->verbose = PSA_UDPMC_DEFAULT_VERBOSE;
-       const char *verboseStr = NULL;
-       bundleContext_getProperty(context, PSA_UDPMC_VERBOSE_KEY, &verboseStr);
-       if (verboseStr != NULL) {
-               (*admin)->verbose = strncasecmp("true", verboseStr, 
strlen("true")) == 0;
-       }
-
-       return status;
-}
-
-
-celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin)
-{
-       celix_status_t status = CELIX_SUCCESS;
-
-       free(admin->mcIpAddress);
-       free(admin->ifIpAddress);
-
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->pendingSubscriptions);
-       while(hashMapIterator_hasNext(iter)){
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               free((char*)hashMapEntry_getKey(entry));
-               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->pendingSubscriptions,false,false);
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       hashMap_destroy(admin->subscriptions,false,false);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       hashMap_destroy(admin->localPublications,true,false);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       celixThreadMutex_lock(&admin->externalPublicationsLock);
-       iter = hashMapIterator_create(admin->externalPublications);
-       while(hashMapIterator_hasNext(iter)){
-               hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
-               free((char*)hashMapEntry_getKey(entry));
-               arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->externalPublications,false,false);
-       celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       arrayList_destroy(admin->serializerList);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-       arrayList_destroy(admin->noSerializerSubscriptions);
-       arrayList_destroy(admin->noSerializerPublications);
-       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
-       while(hashMapIterator_hasNext(iter)){
-               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
-
-       iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
-       while(hashMapIterator_hasNext(iter)){
-               
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-       celixThreadMutex_destroy(&admin->usedSerializersLock);
-       celixThreadMutex_destroy(&admin->serializerListLock);
-
-       celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
-       celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
-
-       celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
-       celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
-
-       celixThreadMutex_destroy(&admin->subscriptionsLock);
-       celixThreadMutex_destroy(&admin->localPublicationsLock);
-       celixThreadMutex_destroy(&admin->externalPublicationsLock);
-
-       logHelper_stop(admin->loghelper);
-
-       logHelper_destroy(&admin->loghelper);
-
-       free(admin);
-
-       return status;
-}
-
-static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_t 
*admin,celix_properties_t *subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt any_sub = 
hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-
-       if(any_sub==NULL){
-
-               int i;
-               pubsub_serializer_service_t *best_serializer = NULL;
-               if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer, NULL)) == CELIX_SUCCESS){
-                       status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, admin->ifIpAddress, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
-               }
-               else{
-                       if (admin->verbose) {
-                               printf("PSA_UDP_MC: Cannot find a serializer 
for subscribing topic %s. Adding it to pending list.\n",
-                                          properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                       }
-
-                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                       arrayList_add(admin->noSerializerSubscriptions,subEP);
-                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-               }
-
-               if (status == CELIX_SUCCESS){
-
-                       /* Connect all internal publishers */
-                       celixThreadMutex_lock(&admin->localPublicationsLock);
-                       hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
-                       while(hashMapIterator_hasNext(lp_iter)){
-                               service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
-                               topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                               array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                               if(topic_publishers!=NULL){
-                                       
for(i=0;i<arrayList_size(topic_publishers);i++){
-                                               celix_properties_t *pubEP = 
arrayList_get(topic_publishers,i);
-                                               if(properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-                                               }
-                                       }
-                                       arrayList_destroy(topic_publishers);
-                               }
-                       }
-                       hashMapIterator_destroy(lp_iter);
-                       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-                       /* Connect also all external publishers */
-                       celixThreadMutex_lock(&admin->externalPublicationsLock);
-                       hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
-                       while(hashMapIterator_hasNext(extp_iter)){
-                               array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
-                               if(ext_pub_list!=NULL){
-                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                               celix_properties_t *pubEP = 
arrayList_get(ext_pub_list,i);
-                                               if(properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-                                               }
-                                       }
-                               }
-                       }
-                       hashMapIterator_destroy(extp_iter);
-                       
celixThreadMutex_unlock(&admin->externalPublicationsLock);
-
-
-                       pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
-
-                       status += pubsub_topicSubscriptionStart(any_sub);
-
-               }
-
-               if (status == CELIX_SUCCESS){
-                       
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
-                       connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
-               }
-
-       }
-
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_addSubscription(pubsub_admin_t *admin, 
celix_properties_t *subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       if(strcmp(properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_NAME),PUBSUB_ANY_SUB_TOPIC)==0){
-               return pubsubAdmin_addAnySubscription(admin,subEP);
-       }
-
-       /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       celixThreadMutex_lock(&admin->externalPublicationsLock);
-
-       char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
-
-       service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-       array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-
-       if (factory==NULL && ext_pub_list==NULL) { //No (local or external) 
publishers yet for this topic
-               pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
-       } else {
-               int i;
-               topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
-
-               if (subscription == NULL) {
-                       pubsub_serializer_service_t *best_serializer = NULL;
-            const char *serType = NULL;
-                       if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
-                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,admin->ifIpAddress, 
(char*) properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME), best_serializer, 
&subscription);
-                       } else {
-                               if (admin->verbose) {
-                                       printf("PSA_UDP_MC: Cannot find a 
serializer for subscribing topic %s. Adding it to pending list.\n",
-                                                  properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                               }
-
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       if (status==CELIX_SUCCESS){
-                               //got type and serializer -> update endpoint
-                               celix_properties_set(subEP, 
PUBSUB_ENDPOINT_ADMIN_TYPE, PSA_UDPMC_PUBSUB_ADMIN_TYPE);
-                               celix_properties_set(subEP, 
PUBSUB_ENDPOINT_SERIALIZER, serType);
-
-                               /* Try to connect internal publishers */
-                               if(factory!=NULL){
-                                       topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
-                                       array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
-
-                                       if(topic_publishers!=NULL){
-                                               
for(i=0;i<arrayList_size(topic_publishers);i++){
-                                                       celix_properties_t 
*pubEP = arrayList_get(topic_publishers,i);
-                                                       
if(properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription, (char*) 
properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-                                                       }
-                                               }
-                                               
arrayList_destroy(topic_publishers);
-                                       }
-
-                               }
-
-                               /* Look also for external publishers */
-                               if(ext_pub_list!=NULL){
-                                       
for(i=0;i<arrayList_size(ext_pub_list);i++){
-                                               celix_properties_t *pubEP = 
arrayList_get(ext_pub_list,i);
-                                               if(properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription, (char*) 
properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-                                               }
-                                       }
-                               }
-
-                               
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
-
-                               status += 
pubsub_topicSubscriptionStart(subscription);
-
-                       }
-
-                       if(status==CELIX_SUCCESS){
-
-                               
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
-
-                               connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
-                       }
-               }
-
-               if (status == CELIX_SUCCESS){
-                       pubsub_topicIncreaseNrSubscribers(subscription);
-               }
-       }
-
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-    if (admin->verbose) {
-        printf("[PSA_UDPMC] Added subscription [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
-               properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
-               properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
-        printf("[PSA_UDPMC] \t [endpoint socket address = %s]\n", 
properties_get(subEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-    }
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_t 
*admin,celix_properties_t *subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-    if (admin->verbose) {
-        printf("[PSA_UDPMC] Removing subscription [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
-               properties_get(subEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(subEP, PUBSUB_ENDPOINT_UUID),
-               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(subEP, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("[PSA_UDPMC] \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
-               properties_get(subEP, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(subEP, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(subEP, PUBSUB_ENDPOINT_TYPE));
-        printf("[PSA_UDPMC] \t [endpoint url = %s]\n", properties_get(subEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-    }
-
-       char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL){
-               pubsub_topicDecreaseNrSubscribers(sub);
-               if(pubsub_topicGetNrSubscribers(sub) == 0) {
-                       status = 
pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
-               }
-       }
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       if(sub==NULL){
-               /* Maybe the endpoint was pending */
-               celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-               if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
-                       status = CELIX_ILLEGAL_STATE;
-               }
-               celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-       }
-
-       free(scope_topic);
-
-
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_addPublication(pubsub_admin_t 
*admin,celix_properties_t *pubEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-    const char* fwUUID = NULL;
-    
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-    if(fwUUID==NULL){
-        printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
-        return CELIX_INVALID_BUNDLE_CONTEXT;
-    }
-
-    const char *epFwUUID = properties_get(pubEP, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID);
-    bool isOwn = strncmp(fwUUID, epFwUUID, 128) == 0;
-
-    if (isOwn) {
-        //should be null, willl be set in this call
-        assert(properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY) == NULL);
-        assert(properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY) == NULL);
-    }
-
-    if (isOwn) {
-        properties_set(pubEP, PUBSUB_ADMIN_TYPE_KEY, 
PSA_UDPMC_PUBSUB_ADMIN_TYPE);
-    }
-
-       char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-
-       if ((strcmp(properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID), 
fwUUID) == 0) && (properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) == 
NULL)) {
-
-               celixThreadMutex_lock(&admin->localPublicationsLock);
-
-               service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
-
-               if (factory == NULL) {
-                       topic_publication_pt pub = NULL;
-                       pubsub_serializer_service_t *best_serializer = NULL;
-                       const char* serType = NULL;
-                       if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer, &serType)) == CELIX_SUCCESS){
-                               status = 
pubsub_topicPublicationCreate(admin->sendSocket, pubEP, best_serializer, 
serType, admin->mcIpAddress, &pub);
-                if (isOwn) {
-                    properties_set(pubEP, PUBSUB_SERIALIZER_TYPE_KEY, serType);
-                }
-                       } else {
-                               printf("PSA_UDP_MC: Cannot find a serializer 
for publishing topic %s. Adding it to pending list.\n",
-                                          properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerPublications,pubEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       if (status == CELIX_SUCCESS) {
-                               status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
-                               if (status == CELIX_SUCCESS && factory != NULL) 
{
-                                       hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
-                                       connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
-                               }
-                       } else {
-                               printf("PSA_UDP_MC: Cannot create a 
topicPublication for scope=%s, topic=%s.\n",
-                                          properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                                          properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                       }
-               } else {
-                       //just add the new EP to the list
-                       topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
-                       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-               }
-
-               celixThreadMutex_unlock(&admin->localPublicationsLock);
-       } else {
-
-               celixThreadMutex_lock(&admin->externalPublicationsLock);
-               array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
-               if (ext_pub_list == NULL) {
-                       arrayList_create(&ext_pub_list);
-                       hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
-               }
-
-               arrayList_add(ext_pub_list, pubEP);
-
-               celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       }
-
-       /* Re-evaluate the pending subscriptions */
-       celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
-
-       hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
-       if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
-               char* topic = (char*) hashMapEntry_getKey(pendingSub);
-               array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
-               int i;
-               for (i = 0; i < arrayList_size(pendingSubList); i++) {
-                       celix_properties_t *subEP = 
arrayList_get(pendingSubList, i);
-                       pubsubAdmin_addSubscription(admin, subEP);
-               }
-               hashMap_remove(admin->pendingSubscriptions, scope_topic);
-               arrayList_clear(pendingSubList);
-               arrayList_destroy(pendingSubList);
-               free(topic);
-       }
-
-       celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
-
-       /* Connect the new publisher to the subscription for his topic, if 
there is any */
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-       if (sub != NULL && properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
-               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
(char*) properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-       }
-
-       /* And check also for ANY subscription */
-       topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-       if (any_sub != NULL && properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY) != NULL) {
-               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*) 
properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-       }
-
-       free(scope_topic);
-
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-    if (admin->verbose) {
-        printf("PSA_UDPMC: Added publication [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
-               properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
-               properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s, own = %i]\n",
-               properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),
-               isOwn);
-    }
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_removePublication(pubsub_admin_t 
*admin,celix_properties_t *pubEP){
-       celix_status_t status = CELIX_SUCCESS;
-       int count = 0;
-
-    if (admin->verbose) {
-        printf("PSA_UDPMC: Remove publication [FWUUID=%s endpointUUID=%s 
scope=%s, topic=%s]\n",
-               properties_get(pubEP, PUBSUB_ENDPOINT_FRAMEWORK_UUID),
-               properties_get(pubEP, PUBSUB_ENDPOINT_UUID),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_SCOPE),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TOPIC_NAME));
-        printf("PSA_UDPMC: \t [psa type = %s, ser type = %s, pubsub endpoint 
type = %s]\n",
-               properties_get(pubEP, PUBSUB_ADMIN_TYPE_KEY),
-               properties_get(pubEP, PUBSUB_SERIALIZER_TYPE_KEY),
-               properties_get(pubEP, PUBSUB_ENDPOINT_TYPE));
-        printf("PSA_UDPMC: \t [endpoint url = %s]\n", properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-    }
-
-       const char* fwUUID = NULL;
-
-       
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-       if(fwUUID==NULL){
-               printf("PSA_UDP_MC: Cannot retrieve fwUUID.\n");
-               return CELIX_INVALID_BUNDLE_CONTEXT;
-       }
-       char *scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-
-       if(strcmp(properties_get(pubEP, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
-
-               celixThreadMutex_lock(&admin->localPublicationsLock);
-               service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
-               if(factory!=NULL){
-                       topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-                       pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
-               }
-               celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-               if(factory==NULL){
-                       /* Maybe the endpoint was pending */
-                       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                       
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
-                               status = CELIX_ILLEGAL_STATE;
-                       }
-                       
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-               }
-
-       }
-       else{
-
-               celixThreadMutex_lock(&admin->externalPublicationsLock);
-               array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
-               if(ext_pub_list!=NULL){
-                       int i;
-                       bool found = false;
-                       for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
-                               celix_properties_t *p  = 
arrayList_get(ext_pub_list,i);
-                               found = pubsubEndpoint_equals(pubEP,p);
-                               if (found){
-                                       arrayList_remove(ext_pub_list,i);
-                               }
-                       }
-                       // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
-                       for(i=0; i<arrayList_size(ext_pub_list);i++) {
-                               celix_properties_t *p  = 
arrayList_get(ext_pub_list,i);
-                               if (strcmp(properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY),properties_get(p, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)) == 0) {
-                                       count++;
-                               }
-                       }
-
-                       if(arrayList_size(ext_pub_list)==0){
-                               hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
-                               char* topic = (char*)hashMapEntry_getKey(entry);
-                               array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
-                               
hashMap_remove(admin->externalPublications,topic);
-                               arrayList_destroy(list);
-                               free(topic);
-                       }
-               }
-
-               celixThreadMutex_unlock(&admin->externalPublicationsLock);
-       }
-
-       /* Check if this publisher was connected to one of our subscribers*/
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-
-       topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub, (char*) 
properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-       }
-
-       /* And check also for ANY subscription */
-       topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub, (char*) 
properties_get(pubEP, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY));
-       }
-
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_t *admin,char 
*scope, char* topic){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_UDP_MC: Closing all publications for scope=%s,topic=%s\n", 
scope, topic);
-
-       celixThreadMutex_lock(&admin->localPublicationsLock);
-       char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
-       hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
-       if(pubsvc_entry!=NULL){
-               char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
-               service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
-               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-               status += pubsub_topicPublicationStop(pub);
-               disconnectTopicPubSubFromSerializer(admin, pub, true);
-               status += pubsub_topicPublicationDestroy(pub);
-               hashMap_remove(admin->localPublications,scope_topic);
-               free(key);
-               free(factory);
-       }
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-       return status;
-
-}
-
-celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_t *admin,char 
*scope, char* topic){
-       celix_status_t status = CELIX_SUCCESS;
-
-       printf("PSA_UDP_MC: Closing all subscriptions\n");
-
-       celixThreadMutex_lock(&admin->subscriptionsLock);
-       char* scope_topic = pubsubEndpoint_createScopeTopicKey(scope, topic);
-       hash_map_entry_pt sub_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
-       if(sub_entry!=NULL){
-               char* topic = (char*)hashMapEntry_getKey(sub_entry);
-
-               topic_subscription_pt ts = 
(topic_subscription_pt)hashMapEntry_getValue(sub_entry);
-               status += pubsub_topicSubscriptionStop(ts);
-               disconnectTopicPubSubFromSerializer(admin, ts, false);
-               status += pubsub_topicSubscriptionDestroy(ts);
-               hashMap_remove(admin->subscriptions,topic);
-               free(topic);
-
-       }
-       free(scope_topic);
-       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-       return status;
-
-}
-
-
-#ifndef ANDROID
-static celix_status_t pubsubAdmin_getIpAddress(const char* interface, char** 
ip) {
-       celix_status_t status = CELIX_BUNDLE_EXCEPTION;
-
-       struct ifaddrs *ifaddr, *ifa;
-       char host[NI_MAXHOST];
-
-       if (getifaddrs(&ifaddr) != -1)
-       {
-               for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa 
= ifa->ifa_next)
-               {
-                       if (ifa->ifa_addr == NULL)
-                               continue;
-
-                       if ((getnameinfo(ifa->ifa_addr,sizeof(struct 
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && 
(ifa->ifa_addr->sa_family == AF_INET)) {
-                               if (interface == NULL) {
-                                       *ip = strdup(host);
-                                       status = CELIX_SUCCESS;
-                               }
-                               else if (strcmp(ifa->ifa_name, interface) == 0) 
{
-                                       *ip = strdup(host);
-                                       status = CELIX_SUCCESS;
-                               }
-                       }
-               }
-
-               freeifaddrs(ifaddr);
-       }
-
-       return status;
-}
-#endif
-
-static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_t 
*admin,celix_properties_t *subEP){
-       celix_status_t status = CELIX_SUCCESS;
-
-       char* scope_topic = 
pubsubEndpoint_createScopeTopicKey(properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(subEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-       array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
-       if(pendingListPerTopic==NULL){
-               arrayList_create(&pendingListPerTopic);
-               
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
-       }
-       arrayList_add(pendingListPerTopic,subEP);
-       free(scope_topic);
-
-       return status;
-}
-
-
-celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt 
reference, void * service){
-       /* Assumption: serializers are all available at startup.
-        * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
-
-       celix_status_t status = CELIX_SUCCESS;
-       int i=0;
-
-       const char *serType = NULL;
-       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType == NULL){
-               printf("PSA_UDPMC: Serializer serviceReference %p has no %s 
property specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       pubsub_admin_t *admin = (pubsub_admin_t*)handle;
-       celixThreadMutex_lock(&admin->serializerListLock);
-       arrayList_add(admin->serializerList, reference);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       /* Now let's re-evaluate the pending */
-       celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-
-       for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
-               celix_properties_t *ep = 
arrayList_get(admin->noSerializerSubscriptions,i);
-               pubsub_serializer_service_t *best_serializer = NULL;
-               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, 
NULL);
-               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
-                       pubsubAdmin_addSubscription(admin, ep);
-               }
-       }
-
-       for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
-               celix_properties_t *ep = 
arrayList_get(admin->noSerializerPublications,i);
-               pubsub_serializer_service_t *best_serializer = NULL;
-               pubsubAdmin_getBestSerializer(admin, ep, &best_serializer, 
NULL);
-               if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
-                       pubsubAdmin_addPublication(admin, ep);
-               }
-       }
-
-       celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-
-       if (admin->verbose) {
-               printf("PSA_UDP_MC: %s serializer added\n", serType);
-       }
-
-       return status;
-}
-
-celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
-
-       pubsub_admin_t *admin = (pubsub_admin_t*)handle;
-       int i=0, j=0;
-       const char *serType = NULL;
-
-       serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
-       if(serType == NULL){
-               printf("Serializer serviceReference %p has no %s property 
specified\n",reference, PUBSUB_SERIALIZER_TYPE_KEY);
-               return CELIX_SERVICE_EXCEPTION;
-       }
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       /* Remove the serializer from the list */
-       arrayList_removeElement(admin->serializerList, reference);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-       array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
-       array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-       /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
-       if(topicPubList!=NULL){
-               for(i=0;i<arrayList_size(topicPubList);i++){
-                       topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
-                       /* Stop the topic publication */
-                       pubsub_topicPublicationStop(topicPub);
-                       /* Get the endpoints that are going to be orphan */
-                       array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
-                       for(j=0;j<arrayList_size(pubList);j++){
-                               celix_properties_t *pubEP = 
arrayList_get(pubList,j);
-                               /* Remove the publication */
-                               pubsubAdmin_removePublication(admin, pubEP);
-                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(properties_get(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
-                                       properties_unset(pubEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
-                               }
-                               /* Add the orphan endpoint to the noSerializer 
pending list */
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerPublications,pubEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-                       arrayList_destroy(pubList);
-
-                       /* Cleanup also the localPublications hashmap*/
-                       celixThreadMutex_lock(&admin->localPublicationsLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
-                       char *key = NULL;
-                       service_factory_pt factory = NULL;
-                       while(hashMapIterator_hasNext(iter)){
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
-                               factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
-                               topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
-                               if(pub==topicPub){
-                                       key = (char*)hashMapEntry_getKey(entry);
-                                       break;
-                               }
-                       }
-                       hashMapIterator_destroy(iter);
-                       if(key!=NULL){
-                               hashMap_remove(admin->localPublications, key);
-                               free(factory);
-                               free(key);
-                       }
-                       celixThreadMutex_unlock(&admin->localPublicationsLock);
-
-                       /* Finally destroy the topicPublication */
-                       pubsub_topicPublicationDestroy(topicPub);
-               }
-               arrayList_destroy(topicPubList);
-       }
-
-       /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
-       if(topicSubList!=NULL){
-               for(i=0;i<arrayList_size(topicSubList);i++){
-                       topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
-                       /* Stop the topic subscription */
-                       pubsub_topicSubscriptionStop(topicSub);
-                       /* Get the endpoints that are going to be orphan */
-                       array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
-                       for(j=0;j<arrayList_size(subList);j++){
-                               celix_properties_t *subEP = 
arrayList_get(subList,j);
-                               /* Remove the subscription */
-                               pubsubAdmin_removeSubscription(admin, subEP);
-                               /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(properties_get(subEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY)!=NULL){
-                                       properties_unset(subEP, 
PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY);
-                               }
-                               /* Add the orphan endpoint to the noSerializer 
pending list */
-                               
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
-                               
arrayList_add(admin->noSerializerSubscriptions,subEP);
-                               
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
-                       }
-
-                       /* Cleanup also the subscriptions hashmap*/
-                       celixThreadMutex_lock(&admin->subscriptionsLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
-                       char *key = NULL;
-                       while(hashMapIterator_hasNext(iter)){
-                               hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
-                               topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
-                               if(sub==topicSub){
-                                       key = (char*)hashMapEntry_getKey(entry);
-                                       break;
-                               }
-                       }
-                       hashMapIterator_destroy(iter);
-                       if(key!=NULL){
-                               hashMap_remove(admin->subscriptions, key);
-                               free(key);
-                       }
-                       celixThreadMutex_unlock(&admin->subscriptionsLock);
-
-                       /* Finally destroy the topicSubscription */
-                       pubsub_topicSubscriptionDestroy(topicSub);
-               }
-               arrayList_destroy(topicSubList);
-       }
-
-       if (admin->verbose) {
-               printf("PSA_UDP_MC: %s serializer removed\n", serType);
-       }
-
-
-       return CELIX_SUCCESS;
-}
-
-
-/* This one recall the same logic as in the match function */
-/*
-static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_t *admin, 
celix_properties_t *ep, pubsub_serializer_service_t **out, const char 
**serType){
-       celix_status_t status = CELIX_SUCCESS;
-
-       pubsub_serializer_service_t *serSvc = NULL;
-       service_reference_pt svcRef = NULL;
-
-       celixThreadMutex_lock(&admin->serializerListLock);
-       status = pubsub_admin_get_best_serializer(ep->properties, 
admin->serializerList, &svcRef);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-
-       if (svcRef != NULL) {
-               bundleContext_getService(admin->bundle_context, svcRef, 
(void**)&serSvc);
-               bundleContext_ungetService(admin->bundle_context, svcRef, 
NULL); //TODO, FIXME this should not be done this way. only unget if the 
service is not used any more
-        if (serType != NULL) {
-            serviceReference_getProperty(svcRef, PUBSUB_SERIALIZER_TYPE_KEY, 
serType);
-        }
-       }
-
-       *out = serSvc;
-
-       return status;
-}*/
-
-static void connectTopicPubSubToSerializer(pubsub_admin_t 
*admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-       array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
-       if(list==NULL){
-               arrayList_create(&list);
-               hashMap_put(map,serializer,list);
-       }
-       arrayList_add(list,topicPubSub);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}
-
-static void disconnectTopicPubSubFromSerializer(pubsub_admin_t *admin,void 
*topicPubSub,bool isPublication){
-
-       celixThreadMutex_lock(&admin->usedSerializersLock);
-
-       hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
-       hash_map_iterator_pt iter = hashMapIterator_create(map);
-       while(hashMapIterator_hasNext(iter)){
-               array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               if(arrayList_removeElement(list, topicPubSub)){ //Found it!
-                       break;
-               }
-       }
-       hashMapIterator_destroy(iter);
-
-       celixThreadMutex_unlock(&admin->usedSerializersLock);
-
-}
-
-celix_status_t pubsubAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, double *outScore, long 
*outSerializerSvcId) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       celixThreadMutex_lock(&admin->serializerListLock);
-       long serializerSvcId = -1L;
-       double score = pubsub_utils_matchPublisher(admin->bundle_context, 
svcRequesterBndId, svcFilter->filterStr, PSA_UDPMC_PUBSUB_ADMIN_TYPE, 
admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, 
admin->serializerList, &serializerSvcId);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-       if (outScore != NULL) {
-               *outScore = score;
-       }
-       if (outSerializerSvcId != NULL) {
-               *outSerializerSvcId = serializerSvcId;
-       }
-       return status;
-}
-
-celix_status_t pubsubAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, double *outScore, 
long *outSerializerSvcId) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       celixThreadMutex_lock(&admin->serializerListLock);
-       long serializerSvcId = -1L;
-       double score = pubsub_utils_matchSubscriber(admin->bundle_context, 
svcProviderBndId, svcProperties, PSA_UDPMC_PUBSUB_ADMIN_TYPE, 
admin->qosSampleScore, admin->qosControlScore, admin->defaultScore, 
admin->serializerList, &serializerSvcId);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-       if (outScore != NULL) {
-               *outScore = score;
-       }
-       if (outSerializerSvcId != NULL) {
-               *outSerializerSvcId = serializerSvcId;
-       }
-       return status;
-}
-
-celix_status_t pubsubAdmin_matchEndpoint(void *handle, const 
celix_properties_t *endpoint, double *outScore) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       celixThreadMutex_lock(&admin->serializerListLock);
-       long serializerSvcId = -1L;
-       double score = pubsub_utils_matchEndpoint(admin->bundle_context, 
endpoint, PSA_UDPMC_PUBSUB_ADMIN_TYPE, admin->qosSampleScore, 
admin->qosControlScore, admin->defaultScore, admin->serializerList, 
&serializerSvcId);
-       celixThreadMutex_unlock(&admin->serializerListLock);
-       if (outScore != NULL) {
-               *outScore = score;
-       }
-       return status;
-}
-
-celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**publisherEndpoint) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
-celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char 
*scope, const char *topic) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
-
-celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**subscriberEndpoint) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
-
-celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char 
*scope, const char *topic) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
-
-celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t 
*endpoint) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
-
-celix_status_t pubsubAdmin_removeEndpoint(void *handle, const 
celix_properties_t *endpoint) {
-       pubsub_admin_t *admin = handle;
-       celix_status_t status = CELIX_SUCCESS;
-       return status;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/celix/blob/ffb97ffe/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
deleted file mode 100644
index 7618a6e..0000000
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/pubsub_admin_impl.h
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * pubsub_admin_impl.h
- *
- *  \date       Dec 5, 2013
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef PUBSUB_ADMIN_UDP_MC_IMPL_H_
-#define PUBSUB_ADMIN_UDP_MC_IMPL_H_
-
-#include "pubsub_psa_udpmc_constants.h"
-#include "pubsub_admin.h"
-#include "log_helper.h"
-
-
-#define PUBSUB_PSA_UDPMC_PSA_TYPE                                      "udp_mc"
-#define PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY                    
"pubsub.udpmc.socket_address"
-
-typedef struct pubsub_admin {
-
-       bundle_context_pt bundle_context;
-       log_helper_pt loghelper;
-
-       /* List of the available serializers */
-       celix_thread_mutex_t serializerListLock; // List<serializers>
-       array_list_pt serializerList;
-
-       celix_thread_mutex_t localPublicationsLock;
-       hash_map_pt localPublications;//<topic(string),service_factory_pt>
-
-       celix_thread_mutex_t externalPublicationsLock;
-       hash_map_pt externalPublications;//<topic(string),List<pubsub_ep>>
-
-       celix_thread_mutex_t subscriptionsLock;
-       hash_map_pt subscriptions; //<topic(string),topic_subscription>
-
-       celix_thread_mutex_t pendingSubscriptionsLock;
-       celix_thread_mutexattr_t pendingSubscriptionsAttr;
-       hash_map_pt pendingSubscriptions; //<topic(string),List<pubsub_ep>>
-
-       /* Those are used to keep track of valid subscriptions/publications 
that still have no valid serializer */
-       celix_thread_mutex_t noSerializerPendingsLock;
-       celix_thread_mutexattr_t noSerializerPendingsAttr;
-       array_list_pt noSerializerSubscriptions; // List<pubsub_ep>
-       array_list_pt noSerializerPublications; // List<pubsub_ep>
-
-       celix_thread_mutex_t usedSerializersLock;
-       hash_map_pt topicSubscriptionsPerSerializer; // 
<serializer,List<topicSubscription>>
-       hash_map_pt topicPublicationsPerSerializer; // 
<serializer,List<topicPublications>>
-
-       char* ifIpAddress; // The local interface which is used for multicast 
communication
-       char* mcIpAddress; // The multicast IP address
-
-       int sendSocket;
-
-
-       double qosSampleScore;
-       double qosControlScore;
-       double defaultScore;
-
-       bool verbose;
-} pubsub_admin_t;
-
-celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_t 
**admin);
-celix_status_t pubsubAdmin_destroy(pubsub_admin_t *admin);
-
-
-void pubsubAdmin_addSerializer(void * handle, void *svc, const 
celix_properties_t *properties);
-void pubsubAdmin_removeSerializer(void * handle, void *svc, const 
celix_properties_t *properties);
-
-
-
-//for the pubsub_admin_service
-
-celix_status_t pubsubAdmin_matchPublisher(void *handle, long 
svcRequesterBndId, const celix_filter_t *svcFilter, double *score, long 
*serializerSvcId);
-celix_status_t pubsubAdmin_matchSubscriber(void *handle, long 
svcProviderBndId, const celix_properties_t *svcProperties, double *score, long 
*serializerSvcId);
-celix_status_t pubsubAdmin_matchEndpoint(void *handle, const 
celix_properties_t *endpoint, double *score);
-
-//note endpoint is owned by caller
-celix_status_t pubsubAdmin_setupTopicSender(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**publisherEndpoint);
-celix_status_t pubsubAdmin_teardownTopicSender(void *handle, const char 
*scope, const char *topic);
-
-//note endpoint is owned by caller
-celix_status_t pubsubAdmin_setupTopicReciever(void *handle, const char *scope, 
const char *topic, long serializerSvcId, celix_properties_t 
**subscriberEndpoint);
-celix_status_t pubsubAdmin_teardownTopicReciever(void *handle, const char 
*scope, const char *topic);
-
-celix_status_t pubsubAdmin_addEndpoint(void *handle, const celix_properties_t 
*endpoint);
-celix_status_t pubsubAdmin_removeEndpoint(void *handle, const 
celix_properties_t *endpoint);
-
-
-
-#endif /* PUBSUB_ADMIN_UDP_MC_IMPL_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/ffb97ffe/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
deleted file mode 100644
index 2fc80ee..0000000
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.c
+++ /dev/null
@@ -1,453 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <errno.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <celix_api.h>
-
-#include "array_list.h"
-#include "celixbool.h"
-#include "service_registration.h"
-#include "utils.h"
-#include "service_factory.h"
-#include "version.h"
-
-#include "topic_publication.h"
-#include "pubsub_common.h"
-#include "pubsub/publisher.h"
-#include "large_udp.h"
-
-#include "pubsub_serializer.h"
-#include "pubsub_psa_udpmc_constants.h"
-#include "pubsub_admin_impl.h"
-
-#define EP_ADDRESS_LEN         32
-
-#define FIRST_SEND_DELAY       2
-
-struct topic_publication {
-       int sendSocket;
-       char* endpoint;
-       service_registration_pt svcFactoryReg;
-       array_list_pt pub_ep_list; //List<pubsub_endpoint>
-       hash_map_pt boundServices; //<bundle_pt,bound_service>
-       celix_thread_mutex_t tp_lock;
-       struct {
-               const char* type;
-               pubsub_serializer_service_t* svc;
-       } serializer;
-       struct sockaddr_in destAddr;
-};
-
-typedef struct publish_bundle_bound_service {
-       topic_publication_pt parent;
-       pubsub_publisher_t service;
-       bundle_pt bundle;
-       char *scope;
-       char *topic;
-       hash_map_pt msgTypes;
-       unsigned short getCount;
-       celix_thread_mutex_t mp_lock;
-       largeUdp_pt largeUdpHandle;
-}* publish_bundle_bound_service_pt;
-
-
-typedef struct pubsub_msg{
-       pubsub_msg_header_t *header;
-       char* payload;
-       unsigned int payloadSize;
-} pubsub_msg_t;
-
-
-static unsigned int rand_range(unsigned int min, unsigned int max);
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
-
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
-
-static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
-
-
-static void delay_first_send_for_late_joiners(void);
-
-
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out){
-
-       char* ep = malloc(EP_ADDRESS_LEN);
-       memset(ep,0,EP_ADDRESS_LEN);
-
-       long serviceId = celix_properties_getAsLong(pubEP, 
OSGI_FRAMEWORK_SERVICE_ID, 0);
-
-       unsigned int port = serviceId + rand_range(UDP_BASE_PORT+serviceId+3, 
UDP_MAX_PORT);
-       snprintf(ep,EP_ADDRESS_LEN,"udp://%s:%u",bindIP,port);
-
-
-       topic_publication_pt pub = calloc(1,sizeof(*pub));
-
-       arrayList_create(&(pub->pub_ep_list));
-       pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
-       celixThreadMutex_create(&(pub->tp_lock),NULL);
-
-       pub->endpoint = ep;
-       pub->sendSocket = sendSocket;
-       pub->destAddr.sin_family = AF_INET;
-       pub->destAddr.sin_addr.s_addr = inet_addr(bindIP);
-       pub->destAddr.sin_port = htons(port);
-
-       pub->serializer.type = best_serializer_type;
-       pub->serializer.svc = best_serializer;
-
-       pubsub_topicPublicationAddPublisherEP(pub, pubEP);
-
-       *out = pub;
-
-       return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
-       celix_status_t status = CELIX_SUCCESS;
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-
-       free(pub->endpoint);
-       arrayList_destroy(pub->pub_ep_list);
-
-       hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
-       while(hashMapIterator_hasNext(iter)){
-               publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
-               pubsub_destroyPublishBundleBoundService(bound);
-       }
-       hashMapIterator_destroy(iter);
-       hashMap_destroy(pub->boundServices,false,false);
-
-       pub->svcFactoryReg = NULL;
-       pub->serializer.svc= NULL;
-       pub->serializer.type= NULL;
-
-       if(close(pub->sendSocket) != 0){
-               status = CELIX_FILE_IO_EXCEPTION;
-       }
-
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       celixThreadMutex_destroy(&(pub->tp_lock));
-
-       free(pub);
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
-       celix_status_t status = CELIX_SUCCESS;
-
-       /* Let's register the new service */
-
-       celix_properties_t *pubEP = arrayList_get(pub->pub_ep_list,0);
-
-       if(pubEP!=NULL){
-               service_factory_pt factory = calloc(1, sizeof(*factory));
-               factory->handle = pub;
-               factory->getService = pubsub_topicPublicationGetService;
-               factory->ungetService = pubsub_topicPublicationUngetService;
-
-               properties_pt props = properties_create();
-               
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
-               
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                properties_set(props,"service.version", 
PUBSUB_PUBLISHER_SERVICE_VERSION);
-
-
-               status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
-
-               if(status != CELIX_SUCCESS){
-                       properties_destroy(props);
-                       printf("[PSA UDPMC] Cannot register ServiceFactory for 
topic %s, topic %s.\n",
-                                  properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),
-                                  properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-               } else {
-                       *svcFactory = factory;
-               }
-       }
-       else{
-               printf("PSA_UDP_MC_PSA_UDP_MC_TP: Cannot find pubsub_endpoint 
after adding it...Should never happen!\n");
-               status = CELIX_SERVICE_EXCEPTION;
-       }
-
-       return status;
-}
-
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
-       return serviceRegistration_unregister(pub->svcFactoryReg);
-}
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep) {
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-       celix_properties_set(ep, PUBSUB_PSA_UDPMC_SOCKET_ADDRESS_KEY, 
pub->endpoint);
-       celix_properties_set(ep, PUBSUB_ADMIN_TYPE_KEY, 
PSA_UDPMC_PUBSUB_ADMIN_TYPE);
-       celix_properties_set(ep, PUBSUB_SERIALIZER_TYPE_KEY, 
pub->serializer.type);
-       arrayList_add(pub->pub_ep_list,ep);
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,celix_properties_t *ep) {
-
-       celixThreadMutex_lock(&(pub->tp_lock));
-       arrayList_removeElement(pub->pub_ep_list,ep);
-       celixThreadMutex_unlock(&(pub->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
-       array_list_pt list = NULL;
-       celixThreadMutex_lock(&(pub->tp_lock));
-       list = arrayList_clone(pub->pub_ep_list);
-       celixThreadMutex_unlock(&(pub->tp_lock));
-       return list;
-}
-
-
-static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service) {
-       celix_status_t  status = CELIX_SUCCESS;
-
-       topic_publication_pt publish = (topic_publication_pt)handle;
-
-       celixThreadMutex_lock(&(publish->tp_lock));
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound==NULL){
-               bound = pubsub_createPublishBundleBoundService(publish,bundle);
-               if(bound!=NULL){
-                       hashMap_put(publish->boundServices,bundle,bound);
-               }
-       }
-       else{
-               bound->getCount++;
-       }
-
-       if (bound != NULL) {
-               *service = &bound->service;
-       }
-
-       celixThreadMutex_unlock(&(publish->tp_lock));
-
-       return status;
-}
-
-static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service)  {
-
-       topic_publication_pt publish = (topic_publication_pt)handle;
-
-       celixThreadMutex_lock(&(publish->tp_lock));
-
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
-       if(bound!=NULL){
-
-               bound->getCount--;
-               if(bound->getCount==0){
-                       pubsub_destroyPublishBundleBoundService(bound);
-                       hashMap_remove(publish->boundServices,bundle);
-               }
-
-       }
-       else{
-               long bundleId = -1;
-               bundle_getBundleId(bundle,&bundleId);
-               printf("PSA_UDP_MC_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
-       }
-
-       /* service should be never used for unget, so let's set the pointer to 
NULL */
-       *service = NULL;
-
-       celixThreadMutex_unlock(&(publish->tp_lock));
-
-       return CELIX_SUCCESS;
-}
-
-static bool send_pubsub_msg(publish_bundle_bound_service_pt bound, 
pubsub_msg_t* msg, bool last, pubsub_release_callback_t *releaseCallback){
-       const int iovec_len = 3; // header + size + payload
-       bool ret = true;
-
-       struct iovec msg_iovec[iovec_len];
-       msg_iovec[0].iov_base = msg->header;
-       msg_iovec[0].iov_len = sizeof(*msg->header);
-       msg_iovec[1].iov_base = &msg->payloadSize;
-       msg_iovec[1].iov_len = sizeof(msg->payloadSize);
-       msg_iovec[2].iov_base = msg->payload;
-       msg_iovec[2].iov_len = msg->payloadSize;
-
-       delay_first_send_for_late_joiners();
-
-       if(largeUdp_sendmsg(bound->largeUdpHandle, bound->parent->sendSocket, 
msg_iovec, iovec_len, 0, &bound->parent->destAddr, 
sizeof(bound->parent->destAddr)) == -1) {
-               perror("send_pubsub_msg:sendSocket");
-               ret = false;
-       }
-
-       if(releaseCallback) {
-               releaseCallback->release(msg->payload, bound);
-       }
-       return ret;
-
-}
-
-
-static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *inMsg) {
-       int status = 0;
-       publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
-
-       celixThreadMutex_lock(&(bound->parent->tp_lock));
-       celixThreadMutex_lock(&(bound->mp_lock));
-
-       pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(intptr_t)msgTypeId);
-
-       if (msgSer != NULL) {
-               int major=0, minor=0;
-
-               pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
-               strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
-               msg_hdr->type = msgTypeId;
-
-
-               if (msgSer->msgVersion != NULL){
-                       version_getMajor(msgSer->msgVersion, &major);
-                       version_getMinor(msgSer->msgVersion, &minor);
-                       msg_hdr->major = major;
-                       msg_hdr->minor = minor;
-               }
-
-               void* serializedOutput = NULL;
-               size_t serializedOutputLen = 0;
-               msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
-
-               pubsub_msg_t *msg = calloc(1,sizeof(pubsub_msg_t));
-               msg->header = msg_hdr;
-               msg->payload = (char*)serializedOutput;
-               msg->payloadSize = serializedOutputLen;
-
-
-               if(send_pubsub_msg(bound, msg,true, NULL) == false) {
-                       status = -1;
-               }
-               free(msg_hdr);
-               free(msg);
-               free(serializedOutput);
-
-
-       } else {
-               printf("PSA_UDP_MC_TP: No msg serializer available for msg type 
id %d\n", msgTypeId);
-               status=-1;
-       }
-
-       celixThreadMutex_unlock(&(bound->mp_lock));
-       celixThreadMutex_unlock(&(bound->parent->tp_lock));
-
-       return status;
-}
-
-static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
-       *msgTypeId = utils_stringHash(msgType);
-       return 0;
-}
-
-
-static unsigned int rand_range(unsigned int min, unsigned int max){
-
-       double scaled = (double)(((double)random())/((double)RAND_MAX));
-       return (max-min+1)*scaled + min;
-
-}
-
-static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
-
-       publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
-
-       if (bound != NULL) {
-
-               bound->parent = tp;
-               bound->bundle = bundle;
-               bound->getCount = 1;
-               celixThreadMutex_create(&bound->mp_lock,NULL);
-
-               if (tp->serializer.svc != NULL){
-                       
tp->serializer.svc->createSerializerMap(tp->serializer.svc->handle,bundle,&bound->msgTypes);
-               }
-
-               celix_properties_t *pubEP = 
arrayList_get(bound->parent->pub_ep_list,0);
-               bound->scope=strdup(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_SCOPE));
-               bound->topic=strdup(properties_get(pubEP, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-               bound->largeUdpHandle = largeUdp_create(1);
-
-               bound->service.handle = bound;
-               bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
-               bound->service.send = pubsub_topicPublicationSend;
-               bound->service.sendMultipart = NULL;  //Multipart not supported 
for UDP
-
-       }
-
-       return bound;
-}
-
-static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
-
-       celixThreadMutex_lock(&boundSvc->mp_lock);
-
-       if(boundSvc->parent->serializer.svc != NULL && boundSvc->msgTypes != 
NULL){
-               
boundSvc->parent->serializer.svc->destroySerializerMap(boundSvc->parent->serializer.svc->handle,
 boundSvc->msgTypes);
-       }
-
-       if(boundSvc->scope!=NULL){
-               free(boundSvc->scope);
-       }
-
-       if(boundSvc->topic!=NULL){
-               free(boundSvc->topic);
-       }
-
-       largeUdp_destroy(boundSvc->largeUdpHandle);
-
-       celixThreadMutex_unlock(&boundSvc->mp_lock);
-       celixThreadMutex_destroy(&boundSvc->mp_lock);
-
-       free(boundSvc);
-
-}
-
-static void delay_first_send_for_late_joiners(){
-
-       static bool firstSend = true;
-
-       if(firstSend){
-               printf("PSA_UDP_MC_TP: Delaying first send for late 
joiners...\n");
-               sleep(FIRST_SEND_DELAY);
-               firstSend = false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/celix/blob/ffb97ffe/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
----------------------------------------------------------------------
diff --git a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h 
b/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
deleted file mode 100644
index 0dc89fb..0000000
--- a/bundles/pubsub/pubsub_admin_udp_mc/src/topic_publication.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-/*
- * topic_publication.h
- *
- *  \date       Sep 24, 2015
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
-
-#ifndef TOPIC_PUBLICATION_H_
-#define TOPIC_PUBLICATION_H_
-
-#include "pubsub/publisher.h"
-#include "pubsub_endpoint.h"
-#include "pubsub_common.h"
-
-#include "pubsub_serializer.h"
-
-#define UDP_BASE_PORT  49152
-#define UDP_MAX_PORT   65000
-
-typedef struct pubsub_udp_msg {
-    struct pubsub_msg_header header;
-    unsigned int payloadSize;
-    char payload[];
-} pubsub_udp_msg_t;
-
-typedef struct topic_publication *topic_publication_pt;
-celix_status_t pubsub_topicPublicationCreate(int sendSocket, 
celix_properties_t *pubEP, pubsub_serializer_service_t *best_serializer, const 
char* best_serializer_type, char* bindIP, topic_publication_pt *out);
-celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub);
-
-celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub, 
celix_properties_t *ep);
-celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub, celix_properties_t *ep);
-
-celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory);
-celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub);
-
-array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub);
-
-#endif /* TOPIC_PUBLICATION_H_ */

Reply via email to