http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
index 94a8e11,0000000..e3e9704
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
+++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c
@@@ -1,457 -1,0 +1,460 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <stdbool.h>
 +#include <netdb.h>
 +#include <netinet/in.h>
 +
 +#include "constants.h"
 +#include "celix_threads.h"
 +#include "bundle_context.h"
 +#include "array_list.h"
 +#include "utils.h"
 +#include "celix_errno.h"
 +#include "filter.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +
 +#include "publisher_endpoint_announce.h"
 +#include "etcd_common.h"
 +#include "etcd_watcher.h"
 +#include "etcd_writer.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_discovery_impl.h"
 +
 +/* Discovery activator functions */
 +celix_status_t pubsub_discovery_create(bundle_context_pt context, 
pubsub_discovery_pt *ps_discovery) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      *ps_discovery = calloc(1, sizeof(**ps_discovery));
 +
 +      if (*ps_discovery == NULL) {
 +              status = CELIX_ENOMEM;
 +      }
 +      else{
 +              (*ps_discovery)->context = context;
 +              (*ps_discovery)->discoveredPubs = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +              (*ps_discovery)->listenerReferences = 
hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL);
 +              (*ps_discovery)->watchers = 
hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL);
 +              
celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL);
 +              celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, 
NULL);
 +              celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL);
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 +
 +      hash_map_iterator_pt iter = 
hashMapIterator_create(ps_discovery->discoveredPubs);
 +
 +      while (hashMapIterator_hasNext(iter)) {
 +              array_list_pt pubEP_list = (array_list_pt) 
hashMapIterator_nextValue(iter);
 +
 +              for(int i=0; i < arrayList_size(pubEP_list); i++) {
 +                      
pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i)));
 +              }
 +              arrayList_destroy(pubEP_list);
 +      }
 +
 +      hashMapIterator_destroy(iter);
 +
 +      hashMap_destroy(ps_discovery->discoveredPubs, true, false);
 +      ps_discovery->discoveredPubs = NULL;
 +
 +      celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
 +
 +      celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex);
 +
 +
 +      celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex);
 +
 +      hashMap_destroy(ps_discovery->listenerReferences, false, false);
 +      ps_discovery->listenerReferences = NULL;
 +
 +      celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex);
 +
 +      celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex);
 +
 +      free(ps_discovery);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) {
 +    celix_status_t status = CELIX_SUCCESS;
 +    status = etcdCommon_init(ps_discovery->context);
 +    ps_discovery->writer = etcdWriter_create(ps_discovery);
 +
 +    return status;
 +}
 +
 +celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) {
 +    celix_status_t status = CELIX_SUCCESS;
 +
 +    const char* fwUUID = NULL;
 +
 +    bundleContext_getProperty(ps_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 +    if (fwUUID == NULL) {
 +        printf("PSD: Cannot retrieve fwUUID.\n");
 +        return CELIX_INVALID_BUNDLE_CONTEXT;
 +    }
 +
 +    celixThreadMutex_lock(&ps_discovery->watchersMutex);
 +
 +    hash_map_iterator_pt iter = 
hashMapIterator_create(ps_discovery->watchers);
 +    while (hashMapIterator_hasNext(iter)) {
 +        struct watcher_info * wi = hashMapIterator_nextValue(iter);
 +        etcdWatcher_stop(wi->watcher);
 +    }
 +    hashMapIterator_destroy(iter);
 +
 +    celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex);
 +
 +    /* Unexport all publishers for the local framework, and also delete from 
ETCD publisher belonging to the local framework */
 +
 +    iter = hashMapIterator_create(ps_discovery->discoveredPubs);
 +    while (hashMapIterator_hasNext(iter)) {
 +        array_list_pt pubEP_list = (array_list_pt) 
hashMapIterator_nextValue(iter);
 +
 +        int i;
 +        for (i = 0; i < arrayList_size(pubEP_list); i++) {
 +            pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) 
arrayList_get(pubEP_list, i);
-             if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
++            if (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
 +                etcdWriter_deletePublisherEndpoint(ps_discovery->writer, 
pubEP);
 +            } else {
 +                pubsub_discovery_informPublishersListeners(ps_discovery, 
pubEP, false);
 +                arrayList_remove(pubEP_list, i);
 +                pubsubEndpoint_destroy(pubEP);
 +                i--;
 +            }
 +        }
 +    }
 +
 +    hashMapIterator_destroy(iter);
 +
 +    celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex);
 +    etcdWriter_destroy(ps_discovery->writer);
 +
 +    iter = hashMapIterator_create(ps_discovery->watchers);
 +    while (hashMapIterator_hasNext(iter)) {
 +        struct watcher_info * wi = hashMapIterator_nextValue(iter);
 +        etcdWatcher_destroy(wi->watcher);
 +    }
 +    hashMapIterator_destroy(iter);
 +    hashMap_destroy(ps_discovery->watchers, true, true);
 +    celixThreadMutex_unlock(&ps_discovery->watchersMutex);
 +    return status;
 +}
 +
 +/* Functions called by the etcd_watcher */
 +
 +celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, 
pubsub_endpoint_pt pubEP) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      bool inform=false;
 +      celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
-       char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++      char *pubs_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +      array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key);
 +      if(pubEP_list==NULL){
 +              arrayList_create(&pubEP_list);
 +              arrayList_add(pubEP_list,pubEP);
 +              
hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list);
 +              inform=true;
 +      }
 +      else{
 +              int i;
 +              bool found = false;
 +              for(i=0;i<arrayList_size(pubEP_list) && !found;i++){
 +                      found = 
pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i));
 +              }
 +              if(found){
 +                      pubsubEndpoint_destroy(pubEP);
 +              }
 +              else{
 +                      arrayList_add(pubEP_list,pubEP);
 +                      inform=true;
 +      }
 +      }
 +      free(pubs_key);
 +
 +      celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +      if(inform){
 +          status = 
pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true);
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt 
pubsub_discovery, pubsub_endpoint_pt pubEP) {
 +    celix_status_t status = CELIX_SUCCESS;
 +    pubsub_endpoint_pt p = NULL;
 +    bool found = false;
 +
 +    celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
-     char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++    char *pubs_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +    array_list_pt pubEP_list = (array_list_pt) 
hashMap_get(pubsub_discovery->discoveredPubs, pubs_key);
 +    free(pubs_key);
 +    if (pubEP_list == NULL) {
-         printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n", pubEP->topic);
++        printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",
++                         properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +        status = CELIX_ILLEGAL_STATE;
 +    } else {
 +        int i;
 +
 +        for (i = 0; !found && i < arrayList_size(pubEP_list); i++) {
 +            p = arrayList_get(pubEP_list, i);
 +            found = pubsubEndpoint_equals(pubEP, p);
 +            if (found) {
 +                arrayList_remove(pubEP_list, i);
 +                pubsubEndpoint_destroy(p);
 +            }
 +        }
 +    }
 +
 +    celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +    if (found) {
 +        status = pubsub_discovery_informPublishersListeners(pubsub_discovery, 
pubEP, false);
 +    }
 +    pubsubEndpoint_destroy(pubEP);
 +
 +    return status;
 +}
 +
 +/* Callback to the pubsub_topology_manager */
 +celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt 
pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      // Inform listeners of new publisher endpoint
 +      celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +      if (pubsub_discovery->listenerReferences != NULL) {
 +              hash_map_iterator_pt iter = 
hashMapIterator_create(pubsub_discovery->listenerReferences);
 +              while (hashMapIterator_hasNext(iter)) {
 +                      service_reference_pt reference = 
hashMapIterator_nextKey(iter);
 +
 +                      publisher_endpoint_announce_pt listener = NULL;
 +
 +                      bundleContext_getService(pubsub_discovery->context, 
reference, (void**) &listener);
 +            if (epAdded) {
 +                listener->announcePublisher(listener->handle, pubEP);
 +            } else {
 +                listener->removePublisher(listener->handle, pubEP);
 +            }
 +            bundleContext_ungetService(pubsub_discovery->context, reference, 
NULL);
 +              }
 +              hashMapIterator_destroy(iter);
 +      }
 +
 +      celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +
 +      return status;
 +}
 +
 +
 +/* Service's functions implementation */
 +celix_status_t pubsub_discovery_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
 +      celix_status_t status = CELIX_SUCCESS;
-       printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, 
pubEP->endpoint);
++      printf("pubsub_discovery_announcePublisher : %s / %s\n",
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +      pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +      celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
++      char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +      array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 +
 +      if(pubEP_list==NULL){
 +              arrayList_create(&pubEP_list);
 +              
hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list);
 +      }
 +      free(pub_key);
 +      pubsub_endpoint_pt p = NULL;
 +      pubsubEndpoint_clone(pubEP, &p);
 +
 +      arrayList_add(pubEP_list,p);
 +
 +      status = 
etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true);
 +
 +      celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +      celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +
-       char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic);
++      char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +      array_list_pt pubEP_list = 
(array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key);
 +      free(pub_key);
 +      if(pubEP_list==NULL){
-               printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",pubEP->topic);
++              printf("PSD: Cannot find any registered publisher for topic %s. 
Something is not consistent.\n",properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +              status = CELIX_ILLEGAL_STATE;
 +      }
 +      else{
 +
 +              int i;
 +              bool found = false;
 +              pubsub_endpoint_pt p = NULL;
 +
 +              for(i=0;!found && i<arrayList_size(pubEP_list);i++){
 +                      p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 +                      found = pubsubEndpoint_equals(pubEP,p);
 +              }
 +
 +              if(!found){
 +                      printf("PSD: Trying to remove a not existing endpoint. 
Something is not consistent.\n");
 +                      status = CELIX_ILLEGAL_STATE;
 +              }
 +              else{
 +
 +                      arrayList_removeElement(pubEP_list,p);
 +
 +                      status = 
etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p);
 +
 +                      pubsubEndpoint_destroy(p);
 +              }
 +      }
 +
 +      celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* 
scope, const char* topic) {
 +    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +    char *scope_topic_key = createScopeTopicKey(scope, topic);
 +    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
 +    struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, 
scope_topic_key);
 +    if(wi) {
 +        wi->nr_references++;
 +        free(scope_topic_key);
 +    } else {
 +        wi = calloc(1, sizeof(*wi));
 +        etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, 
scope, topic, &wi->watcher);
 +        wi->nr_references = 1;
 +        hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi);
 +    }
 +
 +    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
 +
 +    return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* 
scope, const char* topic) {
 +    pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle;
 +
 +    char *scope_topic_key = createScopeTopicKey(scope, topic);
 +    celixThreadMutex_lock(&pubsub_discovery->watchersMutex);
 +
 +    hash_map_entry_pt entry =  hashMap_getEntry(pubsub_discovery->watchers, 
scope_topic_key);
 +    if(entry) {
 +        struct watcher_info * wi = hashMapEntry_getValue(entry);
 +        wi->nr_references--;
 +        if(wi->nr_references == 0) {
 +            char *key = hashMapEntry_getKey(entry);
 +            hashMap_remove(pubsub_discovery->watchers, scope_topic_key);
 +            free(key);
 +            free(scope_topic_key);
 +            etcdWatcher_stop(wi->watcher);
 +            etcdWatcher_destroy(wi->watcher);
 +            free(wi);
 +        }
 +    } else {
 +        fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic 
%s\n", topic);
 +    }
 +    celixThreadMutex_unlock(&pubsub_discovery->watchersMutex);
 +    return CELIX_SUCCESS;
 +}
 +
 +/* pubsub_topology_manager tracker callbacks */
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle;
 +      publisher_endpoint_announce_pt listener = 
(publisher_endpoint_announce_pt)service;
 +
 +      celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex);
 +      celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +      /* Notify the PSTM about discovered publisher endpoints */
 +      hash_map_iterator_pt iter = 
hashMapIterator_create(pubsub_discovery->discoveredPubs);
 +      while(hashMapIterator_hasNext(iter)){
 +              array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
 +              int i;
 +              for(i=0;i<arrayList_size(pubEP_list);i++){
 +                      pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
 +                      status += listener->announcePublisher(listener->handle, 
pubEP);
 +              }
 +      }
 +
 +      hashMapIterator_destroy(iter);
 +
 +      hashMap_put(pubsub_discovery->listenerReferences, reference, NULL);
 +
 +      celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +      celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex);
 +
 +      printf("PSD: pubsub_tm_announce_publisher added.\n");
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, 
service);
 +      if (status == CELIX_SUCCESS) {
 +              status = pubsub_discovery_tmPublisherAnnounceAdded(handle, 
reference, service);
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_discovery_pt pubsub_discovery = handle;
 +
 +      celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex);
 +
 +      if (pubsub_discovery->listenerReferences != NULL) {
 +              if (hashMap_remove(pubsub_discovery->listenerReferences, 
reference)) {
 +                      printf("PSD: pubsub_tm_announce_publisher removed.\n");
 +              }
 +      }
 +      celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex);
 +
 +      return status;
 +}
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/include/pubsub_endpoint.h
----------------------------------------------------------------------
diff --cc pubsub/pubsub_spi/include/pubsub_endpoint.h
index 4c39d2f,0000000..598d673
mode 100644,000000..100644
--- a/pubsub/pubsub_spi/include/pubsub_endpoint.h
+++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h
@@@ -1,58 -1,0 +1,65 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_endpoint.h
 + *
 + *  \date       Sep 21, 2015
 + *  \author           <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +
 +#ifndef PUBSUB_ENDPOINT_H_
 +#define PUBSUB_ENDPOINT_H_
 +
 +#include "service_reference.h"
 +#include "listener_hook_service.h"
 +#include "properties.h"
 +
 +#include "pubsub/publisher.h"
 +#include "pubsub/subscriber.h"
 +
++#define PUBSUB_ENDPOINT_ID              "pubsub.endpoint.id"
++#define PUBSUB_ENDPOINT_SERVICE_ID      "service.id"
++#define PUBSUB_ENDPOINT_SERIALIZER      "serializer"
++#define PUBSUB_ENDPOINT_ADMIN_TYPE      "pubsub.admin.type"
++#define PUBSUB_ENDPOINT_URL             "pubsub.endpoint"
++#define PUBSUB_ENDPOINT_TOPIC           "pubsub.topic"
++#define PUBSUB_ENDPOINT_SCOPE           "pubsub.scope"
++#define PUBSUB_ENDPOINT_TYPE            "pubsub.type"
++
 +struct pubsub_endpoint {
-     char *frameworkUUID;
-     char *scope;
-     char *topic;
-     long serviceID;
-     char* endpoint;
-     bool is_secure;
++    long serviceID;         //optional
++    bool is_secure;         //optional
++    properties_pt endpoint_props;
 +    properties_pt topic_props;
 +};
 +
 +typedef struct pubsub_endpoint *pubsub_endpoint_pt;
 +
 +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp);
 +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference,pubsub_endpoint_pt* psEp, bool isPublisher);
 +celix_status_t 
pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher);
 +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out);
 +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp);
 +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2);
++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* 
key, const char* value);
 +
 +char *createScopeTopicKey(const char* scope, const char* topic);
 +
 +#endif /* PUBSUB_ENDPOINT_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/src/pubsub_endpoint.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_spi/src/pubsub_endpoint.c
index c3fd293,0000000..d3b746e
mode 100644,000000..100644
--- a/pubsub/pubsub_spi/src/pubsub_endpoint.c
+++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c
@@@ -1,254 -1,0 +1,282 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * endpoint_description.c
 + *
 + *  \date       25 Jul 2014
 + *  \author     <a href="mailto:d...@celix.apache.org";>Apache Celix Project 
Team</a>
 + *  \copyright  Apache License, Version 2.0
 + */
 +
 +#include <string.h>
 +#include <stdlib.h>
++#include <uuid/uuid.h>
 +
 +#include "celix_errno.h"
 +#include "celix_log.h"
 +
 +#include "pubsub_common.h"
 +#include "pubsub_endpoint.h"
 +#include "constants.h"
 +
 +#include "pubsub_utils.h"
 +
 +
 +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps);
 +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, 
const char *topic, bool isPublisher);
 +
 +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* 
fwUUID, const char* scope, const char* topic, long serviceId,const char* 
endpoint,properties_pt topic_props, bool cloneProps){
 +
++      if (psEp->endpoint_props == NULL) {
++              psEp->endpoint_props = properties_create();
++      }
++
++      char endpointUuid[37];
++
++      uuid_t endpointUid;
++      uuid_generate(endpointUid);
++      uuid_unparse(endpointUid, endpointUuid);
++      properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid);
++
 +      if (fwUUID != NULL) {
-               psEp->frameworkUUID = strdup(fwUUID);
++              properties_set(psEp->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID);
 +      }
 +
 +      if (scope != NULL) {
-               psEp->scope = strdup(scope);
++              properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, 
scope);
 +      }
 +
 +      if (topic != NULL) {
-               psEp->topic = strdup(topic);
++              properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, 
topic);
 +      }
 +
 +      psEp->serviceID = serviceId;
 +
 +      if(endpoint != NULL) {
-               psEp->endpoint = strdup(endpoint);
++              properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, 
endpoint);
 +      }
 +
 +      if(topic_props != NULL){
 +              if(cloneProps){
 +                      properties_copy(topic_props, &(psEp->topic_props));
 +              }
 +              else{
 +                      psEp->topic_props = topic_props;
 +              }
 +      }
 +}
 +
 +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, 
const char *topic, bool isPublisher){
 +
 +      properties_pt topic_props = NULL;
 +
 +      bool isSystemBundle = false;
 +      bundle_isSystemBundle(bundle, &isSystemBundle);
 +      long bundleId = -1;
 +      bundle_isSystemBundle(bundle, &isSystemBundle);
 +      bundle_getBundleId(bundle,&bundleId);
 +
 +      if(isSystemBundle == false) {
 +
 +              char *bundleRoot = NULL;
 +              char* topicPropertiesPath = NULL;
 +              bundle_getEntry(bundle, ".", &bundleRoot);
 +
 +              if(bundleRoot != NULL){
 +
 +                      asprintf(&topicPropertiesPath, 
"%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", 
topic);
 +                      topic_props = properties_load(topicPropertiesPath);
 +                      if(topic_props==NULL){
 +                              printf("PSEP: Could not load properties for %s 
on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", 
topic,bundleId);
 +                      }
 +
 +                      free(topicPropertiesPath);
 +                      free(bundleRoot);
 +              }
 +      }
 +
 +      return topic_props;
 +}
 +
++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* 
key, const char* value) {
++      celix_status_t status = CELIX_SUCCESS;
++
++      if (ep->endpoint_props == NULL) {
++              printf("PUBSUB_EP: No endpoint_props for endpoint 
available!\n");
++              return CELIX_ILLEGAL_STATE;
++      }
++
++      if (key != NULL && value != NULL) {
++              properties_set(ep->endpoint_props, key, value);
++      }
++
++      return status;
++}
++
 +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, 
const char* topic, long serviceId,const char* endpoint,properties_pt 
topic_props,pubsub_endpoint_pt* psEp){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      *psEp = calloc(1, sizeof(**psEp));
 +
 +      pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, 
endpoint, topic_props, true);
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt 
*out){
 +      celix_status_t status = CELIX_SUCCESS;
 +
-       *out = calloc(1,sizeof(**out));
++    pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 +
-       pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, 
in->serviceID, in->endpoint, in->topic_props, true);
++      status = properties_copy(in->endpoint_props, &(ep->endpoint_props));
++
++    if (in->topic_props != NULL) {
++        status += properties_copy(in->topic_props, &(ep->topic_props));
++    }
++
++      ep->serviceID = in->serviceID;
++      ep->is_secure = in->is_secure;
++
++    if (status == CELIX_SUCCESS) {
++        *out = ep;
++    } else {
++        pubsubEndpoint_destroy(ep);
++    }
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt 
reference, pubsub_endpoint_pt* psEp, bool isPublisher){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      pubsub_endpoint_pt ep = calloc(1,sizeof(*ep));
 +
 +      bundle_pt bundle = NULL;
 +      bundle_context_pt ctxt = NULL;
 +      const char* fwUUID = NULL;
 +      serviceReference_getBundle(reference,&bundle);
 +      bundle_getContext(bundle,&ctxt);
 +      bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +
 +      const char* scope = NULL;
 +      serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope);
 +
 +      const char* topic = NULL;
 +      serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic);
 +
 +      const char* serviceId = NULL;
 +      
serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId);
 +
 +      /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
 +      properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
 +
 +      pubsubEndpoint_setFields(ep, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, 
strtol(serviceId,NULL,10), NULL, topic_props, false);
 +
-       if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) {
++      if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) 
||
++                      !ep->serviceID ||
++                      !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE) ||
++                      !properties_get(ep->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC)) {
++
 +              fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: 
incomplete description!.");
 +              status = CELIX_BUNDLE_EXCEPTION;
 +              pubsubEndpoint_destroy(ep);
 +              *psEp = NULL;
 +      }
 +      else{
 +              *psEp = ep;
 +      }
 +
 +      return status;
 +
 +}
 +
 +celix_status_t 
pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt 
info,pubsub_endpoint_pt* psEp, bool isPublisher){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      const char* fwUUID=NULL;
 +      
bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +
 +      if(fwUUID==NULL){
 +              return CELIX_BUNDLE_EXCEPTION;
 +      }
 +
 +      char* topic = pubsub_getTopicFromFilter(info->filter);
 +      if(topic==NULL){
 +              return CELIX_BUNDLE_EXCEPTION;
 +      }
 +
 +      *psEp = calloc(1, sizeof(**psEp));
 +
 +      char* scope = pubsub_getScopeFromFilter(info->filter);
 +      if(scope == NULL) {
 +              scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT);
 +      }
 +
 +      bundle_pt bundle = NULL;
 +      long bundleId = -1;
 +      bundleContext_getBundle(info->context,&bundle);
 +
 +      bundle_getBundleId(bundle,&bundleId);
 +
 +      properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, 
topic, isPublisher);
 +
 +      /* TODO: is topic_props==NULL a fatal error such that EP cannot be 
created? */
 +      pubsubEndpoint_setFields(*psEp, fwUUID, 
scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, 
topic_props, false);
 +
 +      free(topic);
 +      free(scope);
 +
 +
 +      return status;
 +}
 +
 +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){
 +
-       if(psEp->frameworkUUID!=NULL){
-               free(psEp->frameworkUUID);
-               psEp->frameworkUUID = NULL;
-       }
- 
-       if(psEp->scope!=NULL){
-               free(psEp->scope);
-               psEp->scope = NULL;
-       }
- 
-       if(psEp->topic!=NULL){
-               free(psEp->topic);
-               psEp->topic = NULL;
-       }
- 
-       if(psEp->endpoint!=NULL){
-               free(psEp->endpoint);
-               psEp->endpoint = NULL;
-       }
- 
 +      if(psEp->topic_props != NULL){
 +              properties_destroy(psEp->topic_props);
 +      }
 +
++      if (psEp->endpoint_props != NULL) {
++              properties_destroy(psEp->endpoint_props);
++    }
++
 +      free(psEp);
 +
 +      return CELIX_SUCCESS;
 +
 +}
 +
 +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){
 +
-       return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) &&
-                       (strcmp(psEp1->scope,psEp2->scope)==0) &&
-                       (strcmp(psEp1->topic,psEp2->topic)==0) &&
++      return ((strcmp(properties_get(psEp1->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) &&
++                      (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE))==0) &&
++                      (strcmp(properties_get(psEp1->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC))==0) &&
 +                      (psEp1->serviceID == psEp2->serviceID) /*&&
 +                      ((psEp1->endpoint==NULL && 
psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/
 +      );
 +}
 +
 +char *createScopeTopicKey(const char* scope, const char* topic) {
 +      char *result = NULL;
 +      asprintf(&result, "%s:%s", scope, topic);
 +
 +      return result;
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 2ac75c9,0000000..a63b275
mode 100644,000000..100644
--- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@@ -1,721 -1,0 +1,727 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_topology_manager.c
 + *
 + *  \date       Sep 29, 2011
 + *  \author           <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <string.h>
 +#include <stdbool.h>
 +
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "constants.h"
 +#include "module.h"
 +#include "bundle.h"
 +#include "filter.h"
 +#include "listener_hook_service.h"
 +#include "utils.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_service.h"
 +#include "log_helper.h"
 +
 +#include "publisher_endpoint_announce.h"
 +#include "pubsub_topology_manager.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_admin.h"
 +#include "pubsub_utils.h"
 +
 +
 +celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_pt *manager) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      *manager = calloc(1, sizeof(**manager));
 +      if (!*manager) {
 +              return CELIX_ENOMEM;
 +      }
 +
 +      (*manager)->context = context;
 +
 +      celix_thread_mutexattr_t psaAttr;
 +      celixThreadMutexAttr_create(&psaAttr);
 +      celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
 +      status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
 +      celixThreadMutexAttr_destroy(&psaAttr);
 +
 +      status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
 +      status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
 +      status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
 +
 +      arrayList_create(&(*manager)->psaList);
 +
 +      (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, 
NULL, serviceReference_equals2, NULL);
 +      (*manager)->publications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 +      (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 +
 +      (*manager)->loghelper = logHelper;
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt 
manager) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&manager->discoveryListLock);
 +      hashMap_destroy(manager->discoveryList, false, false);
 +      celixThreadMutex_unlock(&manager->discoveryListLock);
 +      celixThreadMutex_destroy(&manager->discoveryListLock);
 +
 +      celixThreadMutex_lock(&manager->psaListLock);
 +      arrayList_destroy(manager->psaList);
 +      celixThreadMutex_unlock(&manager->psaListLock);
 +      celixThreadMutex_destroy(&manager->psaListLock);
 +
 +      celixThreadMutex_lock(&manager->publicationsLock);
 +      hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
 +      while(hashMapIterator_hasNext(pubit)){
 +              array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(pubit);
 +              int i;
 +              for(i=0;i<arrayList_size(l);i++){
 +                      
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 +              }
 +              arrayList_destroy(l);
 +      }
 +      hashMapIterator_destroy(pubit);
 +      hashMap_destroy(manager->publications, true, false);
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +      celixThreadMutex_destroy(&manager->publicationsLock);
 +
 +      celixThreadMutex_lock(&manager->subscriptionsLock);
 +      hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
 +      while(hashMapIterator_hasNext(subit)){
 +              array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(subit);
 +              int i;
 +              for(i=0;i<arrayList_size(l);i++){
 +                      
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
 +              }
 +              arrayList_destroy(l);
 +      }
 +      hashMapIterator_destroy(subit);
 +      hashMap_destroy(manager->subscriptions, true, false);
 +      celixThreadMutex_unlock(&manager->subscriptionsLock);
 +      celixThreadMutex_destroy(&manager->subscriptionsLock);
 +
 +      free(manager);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaAdded(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +      int i;
 +
 +      pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 +      logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
PSA");
 +
 +      celixThreadMutex_lock(&manager->psaListLock);
 +      arrayList_add(manager->psaList, psa);
 +      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +      // Add already detected subscriptions to new PSA
 +      celixThreadMutex_lock(&manager->subscriptionsLock);
 +      hash_map_iterator_pt subscriptionsIterator = 
hashMapIterator_create(manager->subscriptions);
 +
 +      while (hashMapIterator_hasNext(subscriptionsIterator)) {
 +              array_list_pt sub_ep_list = 
hashMapIterator_nextValue(subscriptionsIterator);
 +              for(i=0;i<arrayList_size(sub_ep_list);i++){
 +                      status += psa->addSubscription(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
 +              }
 +      }
 +
 +      hashMapIterator_destroy(subscriptionsIterator);
 +
 +      celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +      // Add already detected publications to new PSA
 +      status = celixThreadMutex_lock(&manager->publicationsLock);
 +      hash_map_iterator_pt publicationsIterator = 
hashMapIterator_create(manager->publications);
 +
 +      while (hashMapIterator_hasNext(publicationsIterator)) {
 +              array_list_pt pub_ep_list = 
hashMapIterator_nextValue(publicationsIterator);
 +              for(i=0;i<arrayList_size(pub_ep_list);i++){
 +                      status += psa->addPublication(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
 +              }
 +      }
 +
 +      hashMapIterator_destroy(publicationsIterator);
 +
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaModified(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      // Nop...
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_psaRemoved(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +
 +      pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service;
 +
 +      /* Deactivate all publications */
 +      celixThreadMutex_lock(&manager->publicationsLock);
 +
 +      hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
 +      while(hashMapIterator_hasNext(pubit)){
 +              hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
 +              char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
 +              // Extract scope/topic name from key
 +              char scope[MAX_SCOPE_LEN];
 +              char topic[MAX_TOPIC_LEN];
 +              sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
 +              array_list_pt pubEP_list = 
(array_list_pt)hashMapEntry_getValue(pub_entry);
 +
 +              status = psa->closeAllPublications(psa->admin,scope,topic);
 +
 +              if(status==CELIX_SUCCESS){
 +                      celixThreadMutex_lock(&manager->discoveryListLock);
 +                      hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
 +                      while(hashMapIterator_hasNext(iter)){
 +                              service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
 +                              publisher_endpoint_announce_pt disc = NULL;
 +                              bundleContext_getService(manager->context, 
disc_sr, (void**) &disc);
 +                              const char* fwUUID = NULL;
 +                              
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +                              int i;
 +                              for(i=0;i<arrayList_size(pubEP_list);i++){
 +                                      pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                                       
if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
++                                      
if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 +                                              
disc->removePublisher(disc->handle,pubEP);
 +                                      }
 +                              }
 +                              bundleContext_ungetService(manager->context, 
disc_sr, NULL);
 +                      }
 +                      hashMapIterator_destroy(iter);
 +                      celixThreadMutex_unlock(&manager->discoveryListLock);
 +              }
 +      }
 +      hashMapIterator_destroy(pubit);
 +
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +      /* Deactivate all subscriptions */
 +      celixThreadMutex_lock(&manager->subscriptionsLock);
 +      hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
 +      while(hashMapIterator_hasNext(subit)){
 +              // TODO do some error checking
 +              char* scope_topic = (char*)hashMapIterator_nextKey(subit);
 +              char scope[MAX_TOPIC_LEN];
 +              char topic[MAX_TOPIC_LEN];
 +              memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
 +              memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
 +              sscanf(scope_topic, "%[^:]:%s", scope, topic );
 +              status += psa->closeAllSubscriptions(psa->admin,scope, topic);
 +      }
 +      hashMapIterator_destroy(subit);
 +      celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +      celixThreadMutex_lock(&manager->psaListLock);
 +      arrayList_removeElement(manager->psaList, psa);
 +      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +      logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed 
PSA");
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +      //subscriber_service_pt subscriber = (subscriber_service_pt)service;
 +
 +      pubsub_endpoint_pt sub = NULL;
 +      if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == 
CELIX_SUCCESS){
 +              celixThreadMutex_lock(&manager->subscriptionsLock);
-               char *sub_key = createScopeTopicKey(sub->scope, sub->topic);
++              char *sub_key = 
createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +
 +              array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
 +              if(sub_list_by_topic==NULL){
 +                      arrayList_create(&sub_list_by_topic);
 +                      
hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
 +              }
 +              free(sub_key);
 +              arrayList_add(sub_list_by_topic,sub);
 +
 +              celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +              int j;
 +              double score = 0;
 +              double best_score = 0;
 +              pubsub_admin_service_pt best_psa = NULL;
 +              celixThreadMutex_lock(&manager->psaListLock);
 +              for(j=0;j<arrayList_size(manager->psaList);j++){
 +                      pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +                      psa->matchEndpoint(psa->admin,sub,&score);
 +                      if(score>best_score){ /* We have a new winner! */
 +                              best_score = score;
 +                              best_psa = psa;
 +                      }
 +              }
 +
 +              if(best_psa != NULL && best_score>0){
 +                      best_psa->addSubscription(best_psa->admin,sub);
 +              }
 +
 +              // Inform discoveries for interest in the topic
 +              celixThreadMutex_lock(&manager->discoveryListLock);
 +              hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
 +              while(hashMapIterator_hasNext(iter)){
 +                      service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
 +                      publisher_endpoint_announce_pt disc = NULL;
 +                      bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->interestedInTopic(disc->handle, sub->scope, 
sub->topic);
++                      disc->interestedInTopic(disc->handle, 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                      bundleContext_ungetService(manager->context, disc_sr, 
NULL);
 +              }
 +              hashMapIterator_destroy(iter);
 +              celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +              celixThreadMutex_unlock(&manager->psaListLock);
 +      }
 +      else{
 +              status=CELIX_INVALID_BUNDLE_CONTEXT;
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberModified(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      // Nop...
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +
 +      pubsub_endpoint_pt subcmp = NULL;
 +      if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) 
== CELIX_SUCCESS){
 +
 +              int j,k;
 +
 +              // Inform discoveries that we not interested in the topic any 
more
 +              celixThreadMutex_lock(&manager->discoveryListLock);
 +              hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
 +              while(hashMapIterator_hasNext(iter)){
 +                      service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
 +                      publisher_endpoint_announce_pt disc = NULL;
 +                      bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->uninterestedInTopic(disc->handle, subcmp->scope, 
subcmp->topic);
++                      disc->uninterestedInTopic(disc->handle, 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                      bundleContext_ungetService(manager->context, disc_sr, 
NULL);
 +              }
 +              hashMapIterator_destroy(iter);
 +              celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +              celixThreadMutex_lock(&manager->subscriptionsLock);
 +              celixThreadMutex_lock(&manager->psaListLock);
 +
-               char *sub_key = 
createScopeTopicKey(subcmp->scope,subcmp->topic);
++              char *sub_key = 
createScopeTopicKey(properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +              array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
 +              free(sub_key);
 +              if(sub_list_by_topic!=NULL){
 +                      for(j=0;j<arrayList_size(sub_list_by_topic);j++){
 +                              pubsub_endpoint_pt sub = 
arrayList_get(sub_list_by_topic,j);
 +                              if(pubsubEndpoint_equals(sub,subcmp)){
 +                                      
for(k=0;k<arrayList_size(manager->psaList);k++){
 +                                              /* No problem with invoking 
removal on all psa's, only the one that manage this topic will do something */
 +                                              pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
 +                                              
psa->removeSubscription(psa->admin,sub);
 +                                      }
 +
 +                              }
 +                              arrayList_remove(sub_list_by_topic,j);
 +
 +                              /* If it was the last subscriber for this 
topic, tell PSA to close the ZMQ socket */
 +                              if(arrayList_size(sub_list_by_topic)==0){
 +                                      
for(k=0;k<arrayList_size(manager->psaList);k++){
 +                                              pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic);
++                                              
psa->closeAllSubscriptions(psa->admin, (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                                      }
 +                              }
 +
 +                              pubsubEndpoint_destroy(sub);
 +
 +                      }
 +              }
 +
 +              celixThreadMutex_unlock(&manager->psaListLock);
 +              celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +              pubsubEndpoint_destroy(subcmp);
 +
 +      }
 +      else{
 +              status=CELIX_INVALID_BUNDLE_CONTEXT;
 +      }
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, 
service_reference_pt reference, void* service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle;
 +      publisher_endpoint_announce_pt disc = 
(publisher_endpoint_announce_pt)service;
 +
 +      const char* fwUUID = NULL;
 +
 +      
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +      if(fwUUID==NULL){
 +              printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
 +              return CELIX_INVALID_BUNDLE_CONTEXT;
 +      }
 +
 +      celixThreadMutex_lock(&manager->publicationsLock);
 +
 +      celixThreadMutex_lock(&manager->discoveryListLock);
 +      hashMap_put(manager->discoveryList, reference, NULL);
 +      celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +      hash_map_iterator_pt iter = 
hashMapIterator_create(manager->publications);
 +      while(hashMapIterator_hasNext(iter)){
 +              array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
 +              for(int i = 0; i < arrayList_size(pubEP_list); i++) {
 +                      pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && 
(pubEP->endpoint!=NULL)){
++                      if( (strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && 
(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){
 +                              status += 
disc->announcePublisher(disc->handle,pubEP);
 +                      }
 +              }
 +      }
 +      hashMapIterator_destroy(iter);
 +
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +      celixThreadMutex_lock(&manager->subscriptionsLock);
 +      iter = hashMapIterator_create(manager->subscriptions);
 +
 +      while(hashMapIterator_hasNext(iter)) {
 +              array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(iter);
 +              int i;
 +              for(i=0;i<arrayList_size(l);i++){
 +                      pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 +
-                       disc->interestedInTopic(disc->handle, subEp->scope, 
subEp->topic);
++                      disc->interestedInTopic(disc->handle, 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +              }
 +      }
 +      hashMapIterator_destroy(iter);
 +      celixThreadMutex_unlock(&manager->subscriptionsLock);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, 
reference, service);
 +      if (status == CELIX_SUCCESS) {
 +              status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, 
reference, service);
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, 
service_reference_pt reference, void * service) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      pubsub_topology_manager_pt manager = handle;
 +
 +      celixThreadMutex_lock(&manager->discoveryListLock);
 +
 +
 +      if (hashMap_remove(manager->discoveryList, reference)) {
 +              logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, 
"EndpointListener Removed");
 +      }
 +
 +      celixThreadMutex_unlock(&manager->discoveryListLock);
 +
 +      return status;
 +}
 +
 +
 +celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, 
array_list_pt listeners) {
 +
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +
 +      int l_index;
 +
 +      for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 +
 +              listener_hook_info_pt info = arrayList_get(listeners, l_index);
 +
 +              pubsub_endpoint_pt pub = NULL;
 +              if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) 
== CELIX_SUCCESS){
 +
 +                      celixThreadMutex_lock(&manager->publicationsLock);
-                       char *pub_key = createScopeTopicKey(pub->scope, 
pub->topic);
++                      char *pub_key = 
createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                      array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
 +                      if(pub_list_by_topic==NULL){
 +                              arrayList_create(&pub_list_by_topic);
 +                              
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
 +                      }
 +                      free(pub_key);
 +                      arrayList_add(pub_list_by_topic,pub);
 +
 +                      celixThreadMutex_unlock(&manager->publicationsLock);
 +
 +                      int j;
 +                      double score = 0;
 +                      double best_score = 0;
 +                      pubsub_admin_service_pt best_psa = NULL;
 +                      celixThreadMutex_lock(&manager->psaListLock);
 +
 +                      for(j=0;j<arrayList_size(manager->psaList);j++){
 +                              pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +                              psa->matchEndpoint(psa->admin,pub,&score);
 +                              if(score>best_score){ /* We have a new winner! 
*/
 +                                      best_score = score;
 +                                      best_psa = psa;
 +                              }
 +                      }
 +
 +                      if(best_psa != NULL && best_score>0){
 +                              status = 
best_psa->addPublication(best_psa->admin,pub);
 +                              if(status==CELIX_SUCCESS){
 +                                      
celixThreadMutex_lock(&manager->discoveryListLock);
 +                                      hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
 +                                      while(hashMapIterator_hasNext(iter)){
 +                                              service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
 +                                              publisher_endpoint_announce_pt 
disc = NULL;
 +                                              
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 +                                              
disc->announcePublisher(disc->handle,pub);
 +                                              
bundleContext_ungetService(manager->context, disc_sr, NULL);
 +                                      }
 +                                      hashMapIterator_destroy(iter);
 +                                      
celixThreadMutex_unlock(&manager->discoveryListLock);
 +                              }
 +                      }
 +
 +                      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +              }
 +
 +      }
 +
 +      return status;
 +
 +}
 +
 +
 +celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, 
array_list_pt listeners) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      pubsub_topology_manager_pt manager = handle;
 +
 +      int l_index;
 +
 +      for (l_index = 0; l_index < arrayList_size(listeners); l_index++) {
 +
 +              listener_hook_info_pt info = arrayList_get(listeners, l_index);
 +
 +              pubsub_endpoint_pt pubcmp = NULL;
 +              if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) 
== CELIX_SUCCESS){
 +
 +
 +                      int j,k;
 +                      celixThreadMutex_lock(&manager->psaListLock);
 +                      celixThreadMutex_lock(&manager->publicationsLock);
 +
-                       char *pub_key = createScopeTopicKey(pubcmp->scope, 
pubcmp->topic);
++                      char *pub_key = 
createScopeTopicKey(properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +                      array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
 +                      if(pub_list_by_topic!=NULL){
 +                              
for(j=0;j<arrayList_size(pub_list_by_topic);j++){
 +                                      pubsub_endpoint_pt pub = 
arrayList_get(pub_list_by_topic,j);
 +                                      if(pubsubEndpoint_equals(pub,pubcmp)){
 +                                              
for(k=0;k<arrayList_size(manager->psaList);k++){
 +                                                      pubsub_admin_service_pt 
psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
 +                                                      status = 
psa->removePublication(psa->admin,pub);
 +                                                      
if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */
 +                                                              
celixThreadMutex_lock(&manager->discoveryListLock);
 +                                                              
hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList);
 +                                                              
while(hashMapIterator_hasNext(iter)){
 +                                                                      
service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
 +                                                                      
publisher_endpoint_announce_pt disc = NULL;
 +                                                                      
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
 +                                                                      
disc->removePublisher(disc->handle,pub);
 +                                                                      
bundleContext_ungetService(manager->context, disc_sr, NULL);
 +                                                              }
 +                                                              
hashMapIterator_destroy(iter);
 +                                                              
celixThreadMutex_unlock(&manager->discoveryListLock);
 +                                                      }
 +                                                      else if(status ==  
CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not 
handle this endpoint */
 +                                                              status = 
CELIX_SUCCESS;
 +                                                      }
 +                                              }
 +                                              //}
 +                                              
arrayList_remove(pub_list_by_topic,j);
 +
 +                                              /* If it was the last publisher 
for this topic, tell PSA to close the ZMQ socket and then inform the discovery 
*/
 +                                              
if(arrayList_size(pub_list_by_topic)==0){
 +                                                      
for(k=0;k<arrayList_size(manager->psaList);k++){
 +                                                              
pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                               
psa->closeAllPublications(psa->admin,pub->scope, pub->topic);
++                                                              
psa->closeAllPublications(psa->admin, (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                                                      }
 +                                              }
 +
 +                                              pubsubEndpoint_destroy(pub);
 +                                      }
 +
 +                              }
 +                      }
 +
 +                      celixThreadMutex_unlock(&manager->publicationsLock);
 +                      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +                      free(pub_key);
 +
 +                      pubsubEndpoint_destroy(pubcmp);
 +
 +              }
 +
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_announcePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
 +      celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
++      printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, 
ep=%s]\n",
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++                 properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +
 +      pubsub_topology_manager_pt manager = handle;
 +      celixThreadMutex_lock(&manager->psaListLock);
 +      celixThreadMutex_lock(&manager->publicationsLock);
 +
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++      char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +      array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
 +      if(pub_list_by_topic==NULL){
 +              arrayList_create(&pub_list_by_topic);
 +              
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
 +      }
 +      free(pub_key);
 +
 +      /* Shouldn't be any other duplicate, since it's filtered out by the 
discovery */
 +      pubsub_endpoint_pt p = NULL;
 +      pubsubEndpoint_clone(pubEP, &p);
 +      arrayList_add(pub_list_by_topic,p);
 +
 +      int j;
 +      double score = 0;
 +      double best_score = 0;
 +      pubsub_admin_service_pt best_psa = NULL;
 +
 +      for(j=0;j<arrayList_size(manager->psaList);j++){
 +              pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
 +              psa->matchEndpoint(psa->admin,p,&score);
 +              if(score>best_score){ /* We have a new winner! */
 +                      best_score = score;
 +                      best_psa = psa;
 +              }
 +      }
 +
 +      if(best_psa != NULL && best_score>0){
 +              best_psa->addPublication(best_psa->admin,p);
 +      }
 +      else{
 +              status = CELIX_ILLEGAL_STATE;
 +      }
 +
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topologyManager_removePublisher(void *handle, 
pubsub_endpoint_pt pubEP){
 +      celix_status_t status = CELIX_SUCCESS;
-       printf("PSTM: Publisher removed for topic %s [fwUUID=%s, 
ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint);
++      printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++                 properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +
 +      pubsub_topology_manager_pt manager = handle;
 +      celixThreadMutex_lock(&manager->psaListLock);
 +      celixThreadMutex_lock(&manager->publicationsLock);
 +      int i;
 +
-       char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic);
++      char *pub_key = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +      array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
 +      if(pub_list_by_topic==NULL){
-               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint);
++              printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is 
inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL));
 +              status = CELIX_ILLEGAL_STATE;
 +      }
 +      else{
 +
 +              pubsub_endpoint_pt p = NULL;
 +              bool found = false;
 +
 +              for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
 +                      p = 
(pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
 +                      found = pubsubEndpoint_equals(p,pubEP);
 +              }
 +
 +              if(found && p !=NULL){
 +
 +                      for(i=0;i<arrayList_size(manager->psaList);i++){
 +                              pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
 +                              /* No problem with invoking removal on all 
psa's, only the one that manage this topic will do something */
 +                              psa->removePublication(psa->admin,p);
 +                      }
 +
 +                      arrayList_removeElement(pub_list_by_topic,p);
 +
 +                      /* If it was the last publisher for this topic, tell 
PSA to close the ZMQ socket */
 +                      if(arrayList_size(pub_list_by_topic)==0){
 +
 +                              for(i=0;i<arrayList_size(manager->psaList);i++){
 +                                      pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       
psa->closeAllPublications(psa->admin,p->scope, p->topic);
++                                      psa->closeAllPublications(psa->admin, 
(char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                              }
 +                      }
 +
 +                      pubsubEndpoint_destroy(p);
 +              }
 +
 +
 +      }
 +      free(pub_key);
 +      celixThreadMutex_unlock(&manager->publicationsLock);
 +      celixThreadMutex_unlock(&manager->psaListLock);
 +
 +
 +      return status;
 +}
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/include/properties.h
----------------------------------------------------------------------
diff --cc utils/include/properties.h
index 5c6dc4d,0000000..582a242
mode 100644,000000..100644
--- a/utils/include/properties.h
+++ b/utils/include/properties.h
@@@ -1,68 -1,0 +1,70 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.h
 + *
 + *  \date       Apr 27, 2010
 + *  \author           <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +
 +#ifndef PROPERTIES_H_
 +#define PROPERTIES_H_
 +
 +#include <stdio.h>
 +
 +#include "hash_map.h"
 +#include "exports.h"
 +#include "celix_errno.h"
 +#ifdef __cplusplus
 +extern "C" {
 +#endif
 +typedef hash_map_pt properties_pt;
 +typedef hash_map_t properties_t;
 +
 +UTILS_EXPORT properties_pt properties_create(void);
 +
 +UTILS_EXPORT void properties_destroy(properties_pt properties);
 +
 +UTILS_EXPORT properties_pt properties_load(const char *filename);
 +
 +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream);
 +
 +UTILS_EXPORT properties_pt properties_loadFromString(const char *input);
 +
 +UTILS_EXPORT void properties_store(properties_pt properties, const char 
*file, const char *header);
 +
 +UTILS_EXPORT const char *properties_get(properties_pt properties, const char 
*key);
 +
 +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, 
const char *key, const char *defaultValue);
 +
 +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, 
const char *value);
 +
++UTILS_EXPORT void properties_unset(properties_pt properties, const char *key);
++
 +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, 
properties_pt *copy);
 +
 +#define PROPERTIES_FOR_EACH(props, key) \
 +    for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
 +        hashMapIterator_hasNext(&iter), (key) = (const 
char*)hashMapIterator_nextKey(&iter);)
 +#ifdef __cplusplus
 +}
 +#endif
 +
 +#endif /* PROPERTIES_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/src/properties.c
----------------------------------------------------------------------
diff --cc utils/src/properties.c
index 1e097a0,0000000..860b9bb
mode 100644,000000..100644
--- a/utils/src/properties.c
+++ b/utils/src/properties.c
@@@ -1,330 -1,0 +1,335 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * properties.c
 + *
 + *  \date       Apr 27, 2010
 + *  \author           <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +#include <stdio.h>
 +#include <string.h>
 +#include <stdlib.h>
 +#include <ctype.h>
 +#include "celixbool.h"
 +#include "properties.h"
 +#include "utils.h"
 +
 +#define MALLOC_BLOCK_SIZE             5
 +
 +static void parseLine(const char* line, properties_pt props);
 +
 +properties_pt properties_create(void) {
 +      return hashMap_create(utils_stringHash, utils_stringHash, 
utils_stringEquals, utils_stringEquals);
 +}
 +
 +void properties_destroy(properties_pt properties) {
 +      hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +      while (hashMapIterator_hasNext(iter)) {
 +              hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +              free(hashMapEntry_getKey(entry));
 +              free(hashMapEntry_getValue(entry));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(properties, false, false);
 +}
 +
 +properties_pt properties_load(const char* filename) {
 +      FILE *file = fopen(filename, "r");
 +      if(file==NULL){
 +              return NULL;
 +      }
 +      properties_pt props = properties_loadWithStream(file);
 +      fclose(file);
 +      return props;
 +}
 +
 +properties_pt properties_loadWithStream(FILE *file) {
 +      properties_pt props = NULL;
 +
 +
 +      if (file != NULL ) {
 +              char *saveptr;
 +              char *filebuffer = NULL;
 +              char *line = NULL;
 +              size_t file_size = 0;
 +
 +              props = properties_create();
 +              fseek(file, 0, SEEK_END);
 +              file_size = ftell(file);
 +              fseek(file, 0, SEEK_SET);
 +
 +              if(file_size > 0){
 +                      filebuffer = calloc(file_size + 1, sizeof(char));
 +                      if(filebuffer) {
 +                              size_t rs = fread(filebuffer, sizeof(char), 
file_size, file);
 +                              if(rs != file_size){
 +                                      fprintf(stderr,"fread read only %lu 
bytes out of %lu\n",rs,file_size);
 +                              }
 +                              filebuffer[file_size]='\0';
 +                              line = strtok_r(filebuffer, "\n", &saveptr);
 +                              while ( line != NULL ) {
 +                                      parseLine(line, props);
 +                                      line = strtok_r(NULL, "\n", &saveptr);
 +                              }
 +                              free(filebuffer);
 +                      }
 +              }
 +      }
 +
 +      return props;
 +}
 +
 +properties_pt properties_loadFromString(const char *input){
 +      properties_pt props = properties_create();
 +
 +      char *in = strdup(input);
 +      char *line = NULL;
 +      char *saveLinePointer = NULL;
 +
 +      bool firstTime = true;
 +      do {
 +              if (firstTime){
 +                      line = strtok_r(in, "\n", &saveLinePointer);
 +                      firstTime = false;
 +              }else {
 +                      line = strtok_r(NULL, "\n", &saveLinePointer);
 +              }
 +
 +              if (line == NULL){
 +                      break;
 +              }
 +
 +              parseLine(line, props);
 +      } while(line != NULL);
 +
 +      free(in);
 +
 +      return props;
 +}
 +
 +
 +/**
 + * Header is ignored for now, cannot handle comments yet
 + */
 +void properties_store(properties_pt properties, const char* filename, const 
char* header) {
 +      FILE *file = fopen ( filename, "w+" );
 +      char *str;
 +
 +      if (file != NULL) {
 +              if (hashMap_size(properties) > 0) {
 +                      hash_map_iterator_pt iterator = 
hashMapIterator_create(properties);
 +                      while (hashMapIterator_hasNext(iterator)) {
 +                              hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iterator);
 +                              str = hashMapEntry_getKey(entry);
 +                              for (int i = 0; i < strlen(str); i += 1) {
 +                                      if (str[i] == '#' || str[i] == '!' || 
str[i] == '=' || str[i] == ':') {
 +                                              fputc('\\', file);
 +                                      }
 +                                      fputc(str[i], file);
 +                              }
 +
 +                              fputc('=', file);
 +
 +                              str = hashMapEntry_getValue(entry);
 +                              for (int i = 0; i < strlen(str); i += 1) {
 +                                      if (str[i] == '#' || str[i] == '!' || 
str[i] == '=' || str[i] == ':') {
 +                                              fputc('\\', file);
 +                                      }
 +                                      fputc(str[i], file);
 +                              }
 +
 +                              fputc('\n', file);
 +
 +                      }
 +                      hashMapIterator_destroy(iterator);
 +              }
 +              fclose(file);
 +      } else {
 +              perror("File is null");
 +      }
 +}
 +
 +celix_status_t properties_copy(properties_pt properties, properties_pt *out) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      properties_pt copy = properties_create();
 +
 +      if (copy != NULL) {
 +              hash_map_iterator_pt iter = hashMapIterator_create(properties);
 +              while (hashMapIterator_hasNext(iter)) {
 +                      hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
 +                      char *key = hashMapEntry_getKey(entry);
 +                      char *value = hashMapEntry_getValue(entry);
 +                      properties_set(copy, key, value);
 +              }
 +              hashMapIterator_destroy(iter);
 +      } else {
 +              status = CELIX_ENOMEM;
 +      }
 +
 +      if (status == CELIX_SUCCESS) {
 +              *out = copy;
 +      }
 +
 +      return status;
 +}
 +
 +const char* properties_get(properties_pt properties, const char* key) {
 +      return hashMap_get(properties, (void*)key);
 +}
 +
 +const char* properties_getWithDefault(properties_pt properties, const char* 
key, const char* defaultValue) {
 +      const char* value = properties_get(properties, key);
 +      return value == NULL ? defaultValue : value;
 +}
 +
 +void properties_set(properties_pt properties, const char* key, const char* 
value) {
 +      hash_map_entry_pt entry = hashMap_getEntry(properties, key);
 +      char* oldValue = NULL;
 +      if (entry != NULL) {
 +              char* oldKey = hashMapEntry_getKey(entry);
 +              oldValue = hashMapEntry_getValue(entry);
 +              hashMap_put(properties, oldKey, strndup(value, 1024*10));
 +      } else {
 +              hashMap_put(properties, strndup(key, 1024*10), strndup(value, 
1024*10));
 +      }
 +      free(oldValue);
 +}
 +
++void properties_unset(properties_pt properties, const char* key) {
++      char* oldValue = hashMap_remove(properties, key);
++      free(oldValue);
++}
++
 +static void updateBuffers(char **key, char ** value, char **output, int 
outputPos, int *key_len, int *value_len) {
 +      if (*output == *key) {
 +              if (outputPos == (*key_len) - 1) {
 +                      (*key_len) += MALLOC_BLOCK_SIZE;
 +                      *key = realloc(*key, *key_len);
 +                      *output = *key;
 +              }
 +      }
 +      else {
 +              if (outputPos == (*value_len) - 1) {
 +                      (*value_len) += MALLOC_BLOCK_SIZE;
 +                      *value = realloc(*value, *value_len);
 +                      *output = *value;
 +              }
 +      }
 +}
 +
 +static void parseLine(const char* line, properties_pt props) {
 +      int linePos = 0;
 +      bool precedingCharIsBackslash = false;
 +      bool isComment = false;
 +      int outputPos = 0;
 +      char *output = NULL;
 +      int key_len = MALLOC_BLOCK_SIZE;
 +      int value_len = MALLOC_BLOCK_SIZE;
 +      linePos = 0;
 +      precedingCharIsBackslash = false;
 +      isComment = false;
 +      output = NULL;
 +      outputPos = 0;
 +
 +      //Ignore empty lines
 +      if (line[0] == '\n' && line[1] == '\0') {
 +              return;
 +      }
 +
 +      char *key = calloc(1, key_len);
 +      char *value = calloc(1, value_len);
 +      key[0] = '\0';
 +      value[0] = '\0';
 +
 +      while (line[linePos] != '\0') {
 +              if (line[linePos] == ' ' || line[linePos] == '\t') {
 +                      if (output == NULL) {
 +                              //ignore
 +                              linePos += 1;
 +                              continue;
 +                      }
 +              }
 +              else {
 +                      if (output == NULL) {
 +                              output = key;
 +                      }
 +              }
 +              if (line[linePos] == '=' || line[linePos] == ':' || 
line[linePos] == '#' || line[linePos] == '!') {
 +                      if (precedingCharIsBackslash) {
 +                              //escaped special character
 +                              output[outputPos++] = line[linePos];
 +                              updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +                              precedingCharIsBackslash = false;
 +                      }
 +                      else {
 +                              if (line[linePos] == '#' || line[linePos] == 
'!') {
 +                                      if (outputPos == 0) {
 +                                              isComment = true;
 +                                              break;
 +                                      }
 +                                      else {
 +                                              output[outputPos++] = 
line[linePos];
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                      }
 +                              }
 +                              else { // = or :
 +                                      if (output == value) { //already have a 
seperator
 +                                              output[outputPos++] = 
line[linePos];
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                      }
 +                                      else {
 +                                              output[outputPos++] = '\0';
 +                                              updateBuffers(&key, &value, 
&output, outputPos, &key_len, &value_len);
 +                                              output = value;
 +                                              outputPos = 0;
 +                                      }
 +                              }
 +                      }
 +              }
 +              else if (line[linePos] == '\\') {
 +                      if (precedingCharIsBackslash) { //double backslash -> 
backslash
 +                              output[outputPos++] = '\\';
 +                              updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +                      }
 +                      precedingCharIsBackslash = true;
 +              }
 +              else { //normal character
 +                      precedingCharIsBackslash = false;
 +                      output[outputPos++] = line[linePos];
 +                      updateBuffers(&key, &value, &output, outputPos, 
&key_len, &value_len);
 +              }
 +              linePos += 1;
 +      }
 +      if (output != NULL) {
 +              output[outputPos] = '\0';
 +      }
 +
 +      if (!isComment) {
 +              //printf("putting 'key'/'value' '%s'/'%s' in properties\n", 
utils_stringTrim(key), utils_stringTrim(value));
 +              properties_set(props, utils_stringTrim(key), 
utils_stringTrim(value));
 +      }
 +      if(key) {
 +              free(key);
 +      }
 +      if(value) {
 +              free(value);
 +      }
 +
 +}

Reply via email to