Repository: celix
Updated Branches:
  refs/heads/feature/CELIX-454-pubsub-disc e30a70f72 -> 69596cfdf


http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 834e3a8..85c67e9 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -16,24 +16,19 @@
  *specific language governing permissions and limitations
  *under the License.
  */
-/*
- * pubsub_topology_manager.c
- *
- *  \date       Sep 29, 2011
- *  \author            <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
- *  \copyright Apache License, Version 2.0
- */
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdbool.h>
 #include <celix_api.h>
+#include <pubsub_utils.h>
+#include <assert.h>
 
 #include "hash_map.h"
-#include "array_list.h"
-#include "bundle_context.h"
+#include "celix_array_list.h"
+#include "celix_bundle_context.h"
 #include "constants.h"
-#include "listener_hook_service.h"
 #include "utils.h"
 #include "log_service.h"
 #include "log_helper.h"
@@ -42,697 +37,761 @@
 #include "pubsub_topology_manager.h"
 #include "pubsub_admin.h"
 
-static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) {
-       for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); 
hashMapIterator_hasNext(&iter);) {
-               const char* key = (const char*)hashMapIterator_nextKey(&iter);
-               fprintf(outStream, "    Topic=%s\n", key);
-               array_list_pt ep_list = hashMap_get(endpoints, key);
-               for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) {
-                       pubsub_endpoint_pt ep = arrayList_get(ep_list, i);
-                       fprintf(outStream, "        Endpoint %d\n", i);
-                       fprintf(outStream, "            Endpoint properties\n");
-                       const char *propKey;
-                       if(ep->properties) {
-                               PROPERTIES_FOR_EACH(ep->properties, propKey) {
-                                       fprintf(outStream, "                %s 
=> %s\n", propKey, celix_properties_get(ep->properties, propKey, NULL));
-                               }
-                       }
-               }
-       }
+#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS       5L
 
-}
+static void* pstm_psaHandlingThread(void *data);
 
-static celix_status_t shellCommand(void *handle, char * commandLine, FILE 
*outStream, FILE *errorStream) {
-       pubsub_topology_manager_t *manager = (pubsub_topology_manager_t*) 
handle;
-       if (manager->publications && !hashMap_isEmpty(manager->publications)) {
-               fprintf(outStream, "Publications:\n");
-               print_endpoint_info(manager->publications, outStream);
-       }
-       if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) 
{
-               fprintf(outStream, "Subscriptions:\n");
-               print_endpoint_info(manager->subscriptions, outStream);
-       }
-       return CELIX_SUCCESS;
-}
-
-celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_t **manager) {
+celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_t **out) {
        celix_status_t status = CELIX_SUCCESS;
 
-       *manager = calloc(1, sizeof(**manager));
-       if (!*manager) {
+       pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager));
+       if (manager == NULL) {
+               *out = NULL;
                return CELIX_ENOMEM;
+       } else {
+               *out = manager;
        }
 
-       (*manager)->context = context;
+       manager->context = context;
 
        celix_thread_mutexattr_t psaAttr;
        celixThreadMutexAttr_create(&psaAttr);
        celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE);
-       status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr);
+       status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, 
&psaAttr);
        celixThreadMutexAttr_destroy(&psaAttr);
 
-       status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL);
-       status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL);
-       status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL);
+       status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, 
NULL);
+       status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, 
NULL);
+       status |= 
celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL);
+       status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL);
+       status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL);
+       status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL);
 
-       arrayList_create(&(*manager)->psaList);
+       status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL);
 
-       (*manager)->discoveryList = hashMap_create(NULL, NULL, NULL, NULL);
-       (*manager)->publications = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
-       (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, 
NULL);
+       manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
+       manager->announceEndpointListeners.list = celix_arrayList_create();
+       manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL);
+       manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
+       manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, 
utils_stringEquals, NULL);
 
-       (*manager)->loghelper = logHelper;
-       (*manager)->shellCmdService.handle = *manager;
-       (*manager)->shellCmdService.executeCommand = shellCommand;
+       manager->loghelper = logHelper;
+       manager->verbose = celix_bundleContext_getPropertyAsBool(context, 
PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
 
-       (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE;
-       const char *verboseStr = NULL;
-       bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, 
&verboseStr);
-       if (verboseStr != NULL) {
-               (*manager)->verbose = strncasecmp("true", verboseStr, 
strlen("true")) == 0;
-       }
+       manager->psaHandling.running = true;
+       celixThread_create(&manager->psaHandling.thread, NULL, 
pstm_psaHandlingThread, manager);
+       celixThread_setName(&manager->psaHandling.thread, "PubSub 
TopologyManager");
 
-       properties_pt shellProps = properties_create();
-       properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info");
-       properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info");
-       properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: 
Overview of PubSub");
-       bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, 
&((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg));
        return status;
 }
 
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager) {
        celix_status_t status = CELIX_SUCCESS;
 
-       celixThreadMutex_lock(&manager->discoveryListLock);
-       hashMap_destroy(manager->discoveryList, false, false);
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-       celixThreadMutex_destroy(&manager->discoveryListLock);
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_destroy(manager->psaList);
-       celixThreadMutex_unlock(&manager->psaListLock);
-       celixThreadMutex_destroy(&manager->psaListLock);
-
-       celixThreadMutex_lock(&manager->publicationsLock);
-       hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(pubit)){
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(pubit);
-               unsigned int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-               }
-               arrayList_destroy(l);
-       }
-       hashMapIterator_destroy(pubit);
-       hashMap_destroy(manager->publications, true, false);
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_destroy(&manager->publicationsLock);
-
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
-       while(hashMapIterator_hasNext(subit)){
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(subit);
-               unsigned int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       
pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i));
-               }
-               arrayList_destroy(l);
-       }
-       hashMapIterator_destroy(subit);
-       hashMap_destroy(manager->subscriptions, true, false);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
-       celixThreadMutex_destroy(&manager->subscriptionsLock);
-       serviceRegistration_unregister(manager->shellCmdReg);
-       free(manager);
+       celixThreadMutex_lock(&manager->psaHandling.mutex);
+       manager->psaHandling.running = false;
+       celixThreadCondition_broadcast(&manager->psaHandling.cond);
+       celixThreadMutex_unlock(&manager->psaHandling.mutex);
+       celixThread_join(manager->psaHandling.thread, NULL);
 
-       return status;
-}
 
-void pubsub_topologyManager_psaAdded(void * handle, void *svc, const 
celix_properties_t *props __attribute__((unused))) {
-       pubsub_topology_manager_t *manager = handle;
-       unsigned int i;
+       celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+       hashMap_destroy(manager->announcedEndpoints.map, false, false);
+       celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+       celixThreadMutex_destroy(&manager->announcedEndpoints.mutex);
 
-       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added 
PSA");
+       celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+       hashMap_destroy(manager->pubsubadmins.map, false, false);
+       celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+       celixThreadMutex_destroy(&manager->pubsubadmins.mutex);
 
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_add(manager->psaList, psa);
-       celixThreadMutex_unlock(&manager->psaListLock);
+       celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+       hashMap_destroy(manager->discoveredEndpoints.map, true, false);
+       celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+       celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex);
 
-       // Add already detected subscriptions to new PSA
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subscriptionsIterator = 
hashMapIterator_create(manager->subscriptions);
-
-       //TODO FIXME no matching used, should only add unmatched subscribers ?
-       //NOTE this is a bug which occurs when psa are started after bundles 
that uses the PSA
-       while (hashMapIterator_hasNext(subscriptionsIterator)) {
-               array_list_pt sub_ep_list = 
hashMapIterator_nextValue(subscriptionsIterator);
-               for(i=0;i<arrayList_size(sub_ep_list);i++){
-                       psa->addSubscription(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(sub_ep_list,i));
-               }
-       }
+       free(manager);
 
-       hashMapIterator_destroy(subscriptionsIterator);
+       return status;
+}
 
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
+void pubsub_topologyManager_psaAdded(void * handle, void *svc, const 
celix_properties_t *props __attribute__((unused))) {
+       pubsub_topology_manager_t *manager = handle;
+       pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
 
-       // Add already detected publications to new PSA
-       celixThreadMutex_lock(&manager->publicationsLock);
-       hash_map_iterator_pt publicationsIterator = 
hashMapIterator_create(manager->publications);
+       long svcId = celix_properties_getAsLong(props, 
OSGI_FRAMEWORK_SERVICE_ID, -1L);
+       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added 
PSA");
 
-       //TODO FIXME no matching used, should only add unmatched publications ?
-       //NOTE this is a bug which occurs when psa are started after bundles 
that uses the PSA
-       while (hashMapIterator_hasNext(publicationsIterator)) {
-               array_list_pt pub_ep_list = 
hashMapIterator_nextValue(publicationsIterator);
-               for(i=0;i<arrayList_size(pub_ep_list);i++){
-                       psa->addPublication(psa->admin, 
(pubsub_endpoint_pt)arrayList_get(pub_ep_list,i));
-               }
+       if (svcId >= 0) {
+               celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+               hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa);
+               celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
        }
 
-       hashMapIterator_destroy(publicationsIterator);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
+       /* NOTE for now it assumed PSA / PST and PSD are started before 
subscribers/publisher
+        * so no retroactively adding subscribers
+        *
+        * TODO future extension?
+        */
 }
 
-void pubsub_topologyManager_psaRemoved(void * handle, void *svc, const 
celix_properties_t *props  __attribute__((unused))) {
-       celix_status_t status = CELIX_SUCCESS;
+void pubsub_topologyManager_psaRemoved(void * handle, void *svc 
__attribute__((unused)), const celix_properties_t *props) {
        pubsub_topology_manager_t *manager = handle;
+       //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc;
+       long svcId = celix_properties_getAsLong(props, 
OSGI_FRAMEWORK_SERVICE_ID, -1L);
 
-       pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc;
-
-       /* Deactivate all publications */
-       celixThreadMutex_lock(&manager->publicationsLock);
-
-       hash_map_iterator_pt pubit = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(pubit)){
-               hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit);
-               char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry);
-               // Extract scope/topic name from key
-               char scope[MAX_SCOPE_LEN];
-               char topic[MAX_TOPIC_LEN];
-               sscanf(scope_topic_key, "%[^:]:%s", scope, topic );
-               array_list_pt pubEP_list = 
(array_list_pt)hashMapEntry_getValue(pub_entry);
-
-               status = psa->closeAllPublications(psa->admin,scope,topic);
-
-               if(status==CELIX_SUCCESS){
-                       celixThreadMutex_lock(&manager->discoveryListLock);
-                       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-                       while(hashMapIterator_hasNext(iter)){
-                               service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                               pubsub_announce_endpoint_listener_t *disc = 
NULL;
-                               bundleContext_getService(manager->context, 
disc_sr, (void**) &disc);
-                               const char* fwUUID = NULL;
-                               
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-                               unsigned int i;
-                               for(i=0;i<arrayList_size(pubEP_list);i++){
-                                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                                       
if(strcmp(properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){
-                                               
disc->removeEndpoint(disc->handle,pubEP->properties);
-                                       }
-                               }
-                               bundleContext_ungetService(manager->context, 
disc_sr, NULL);
+       /* de-announce all publications */
+       celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+       celix_array_list_t *endpointsList = 
hashMap_remove(manager->announcedEndpoints.map, (void*)svcId);
+       celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
+
+       if (endpointsList != NULL) {
+               for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) {
+                       celix_properties_t *endpoint = 
celix_arrayList_get(endpointsList, i);
+                       
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                       for (int j = 0; j < 
celix_arrayList_size(manager->announceEndpointListeners.list); ++j) {
+                               pubsub_announce_endpoint_listener_t *listener;
+                               listener = 
celix_arrayList_get(manager->announceEndpointListeners.list, j);
+                               listener->removeEndpoint(listener->handle, 
endpoint);
                        }
-                       hashMapIterator_destroy(iter);
-                       celixThreadMutex_unlock(&manager->discoveryListLock);
+                       
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+                       celix_properties_destroy(endpoint);
                }
+               celix_arrayList_destroy(endpointsList);
        }
-       hashMapIterator_destroy(pubit);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
-
-       /* Deactivate all subscriptions */
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       hash_map_iterator_pt subit = 
hashMapIterator_create(manager->subscriptions);
-       while(hashMapIterator_hasNext(subit)){
-               // TODO do some error checking
-               char* scope_topic = (char*)hashMapIterator_nextKey(subit);
-               char scope[MAX_TOPIC_LEN];
-               char topic[MAX_TOPIC_LEN];
-               memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char));
-               memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char));
-               sscanf(scope_topic, "%[^:]:%s", scope, topic );
-               status += psa->closeAllSubscriptions(psa->admin,scope, topic);
-       }
-       hashMapIterator_destroy(subit);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
 
-       celixThreadMutex_lock(&manager->psaListLock);
-       arrayList_removeElement(manager->psaList, psa);
-       celixThreadMutex_unlock(&manager->psaListLock);
+       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed 
PSA");
+}
 
-       logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed 
PSA");
+static void pstm_setupTopicReceiverCallback(void *handle, void *svc) {
+       pstm_topic_receiver_or_sender_entry_t *entry = handle;
+       pubsub_admin_service_t *psa = svc;
+       psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, 
entry->selectedSerializerSvcId, &entry->endpoint);
 }
 
-void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *bnd) {
+void pubsub_topologyManager_subscriberAdded(void * handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
        pubsub_topology_manager_t *manager = handle;
-       //subscriber_service_pt subscriber = (subscriber_service_pt)service;
+       //NOTE new local subscriber service register
+       //1) First trying to see if a TopicReceiver already exists for this 
subscriber, if found
+       //2) update the usage count. if not found
+       //3) Create new entry, find matching psa and serializer and broadcast 
cond, so that the psaHandling thread will
+       //   call the psa to setup the topic receiver and announce the endpoint.
+
+       const char *topic = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_TOPIC, NULL);
+       const char *scope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
+       if (topic == NULL) {
+               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+                                         "[PSTM] Warning found subscriber 
service without mandatory %s property.",
+                                         PUBSUB_SUBSCRIBER_TOPIC);
+               return;
+       }
 
-       pubsub_endpoint_pt sub = NULL;
-       if(pubsubEndpoint_createFromSvc(manager->context, bnd, props,false, 
&sub) == CELIX_SUCCESS) {
-               celixThreadMutex_lock(&manager->subscriptionsLock);
-               char *sub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(sub->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
+       long bndId = celix_bundle_getId(bnd);
+       char *scopeAndTopicKey = NULL;
+       scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-               array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
-               if(sub_list_by_topic==NULL){
-                       arrayList_create(&sub_list_by_topic);
-                       
hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic);
-               }
-               free(sub_key);
-               arrayList_add(sub_list_by_topic,sub);
-
-               celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-               unsigned int j;
-               double score = 0;
-               double best_score = 0;
-               pubsub_admin_service_pt best_psa = NULL;
-               celixThreadMutex_lock(&manager->psaListLock);
-               for(j=0;j<arrayList_size(manager->psaList);j++){
-                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                       psa->matchEndpoint(psa->admin,sub,&score);
-                       if (score > best_score) { /* We have a new winner! */
-                               best_score = score;
-                               best_psa = psa;
+       celixThreadMutex_lock(&manager->topicReceivers.mutex);
+       pstm_topic_receiver_or_sender_entry_t *entry = 
hashMap_get(manager->topicReceivers.map, scopeAndTopicKey);
+       if (entry != NULL) {
+               entry->usageCount += 1;
+               free(scopeAndTopicKey);
+       }
+       celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+
+       if (entry == NULL) {
+               //new TopicReceiver needed -> matching for psa/serializer
+               entry = calloc(1, sizeof(*entry));
+               entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner 
ship
+               entry->selectedPsaSvcId = -1L;
+               entry->selectedSerializerSvcId = -1L;
+               entry->usageCount = 1;
+
+               double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+               long serializerSvcId = -1L;
+               long selectedPsasvcId = -1L;
+
+               celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+               hash_map_iterator_t iter = 
hashMapIterator_construct(manager->pubsubadmins.map);
+               while (hashMapIterator_hasNext(&iter)) {
+                       hash_map_entry_t *mapEntry = 
hashMapIterator_nextEntry(&iter);
+                       long svcId = (long) hashMapEntry_getKey(mapEntry);
+                       pubsub_admin_service_t *psa = 
hashMapEntry_getValue(mapEntry);
+                       double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                       long serSvcId = -1L;
+
+                       psa->matchSubscriber(psa->handle, bndId, props, &score, 
&serSvcId);
+                       if (score > highestScore) {
+                               highestScore = score;
+                               serializerSvcId = serSvcId;
+                               selectedPsasvcId = svcId;
                        }
                }
+               celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
 
-               if (best_psa != NULL && best_score>0) {
-                       best_psa->addSubscription(best_psa->admin,sub);
-               }
-
-               // Inform discoveries for interest in the topic
-               celixThreadMutex_lock(&manager->discoveryListLock);
-               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-               while(hashMapIterator_hasNext(iter)){
-                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                       pubsub_announce_endpoint_listener_t *disc = NULL;
-                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->announceEndpoint(disc->handle, sub->properties);
-                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
-               }
-               hashMapIterator_destroy(iter);
-               celixThreadMutex_unlock(&manager->discoveryListLock);
-
-               celixThreadMutex_unlock(&manager->psaListLock);
-       }
-}
-
-
-void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *bnd) {
-       pubsub_topology_manager_t *manager = handle;
-
-       pubsub_endpoint_pt subcmp = NULL;
-       if (pubsubEndpoint_createFromSvc(manager->context, bnd, props, false, 
&subcmp) == CELIX_SUCCESS){
-
-               unsigned int j,k;
+               if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+                       entry->selectedPsaSvcId = selectedPsasvcId;
+                       entry->selectedSerializerSvcId = serializerSvcId;
 
-               // Inform discoveries that we not interested in the topic any 
more
-               celixThreadMutex_lock(&manager->discoveryListLock);
-               hash_map_iterator_pt iter = 
hashMapIterator_create(manager->discoveryList);
-               while(hashMapIterator_hasNext(iter)){
-                       service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                       pubsub_announce_endpoint_listener_t *disc = NULL;
-                       bundleContext_getService(manager->context, disc_sr, 
(void**) &disc);
-                       disc->removeEndpoint(disc->handle, subcmp->properties);
-                       bundleContext_ungetService(manager->context, disc_sr, 
NULL);
-               }
-               hashMapIterator_destroy(iter);
-               celixThreadMutex_unlock(&manager->discoveryListLock);
-
-               celixThreadMutex_lock(&manager->subscriptionsLock);
-               celixThreadMutex_lock(&manager->psaListLock);
-
-               char *sub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-               array_list_pt sub_list_by_topic = 
hashMap_get(manager->subscriptions,sub_key);
-               free(sub_key);
-               if(sub_list_by_topic!=NULL){
-                       for(j=0;j<arrayList_size(sub_list_by_topic);j++){
-                               pubsub_endpoint_pt sub = 
arrayList_get(sub_list_by_topic,j);
-                               if(pubsubEndpoint_equals(sub,subcmp)){
-                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                               /* No problem with invoking 
removal on all psa's, only the one that manage this topic will do something */
-                                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->removeSubscription(psa->admin,sub);
-                                       }
+                       celix_bundleContext_useServiceWithId(manager->context, 
selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry,
+                                                                               
                 pstm_setupTopicReceiverCallback);
 
-                               }
-                               arrayList_remove(sub_list_by_topic,j);
+                       if (entry->endpoint != NULL) {
+                               entry->scope = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+                               entry->scope = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+                               entry->endpointUUID = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
 
-                               /* If it was the last subscriber for this 
topic, tell PSA to close the ZMQ socket */
-                               if(arrayList_size(sub_list_by_topic)==0){
-                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               
psa->closeAllSubscriptions(psa->admin, (char*) 
properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) 
properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME));
-                                       }
+                               //announce new endpoint through the network
+                               
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                               for (int i = 0; i < 
celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                                       pubsub_announce_endpoint_listener_t 
*listener = celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                                       
listener->announceEndpoint(listener->handle, entry->endpoint);
                                }
-
-                               pubsubEndpoint_destroy(sub);
-
+                               
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+                               //store topic receiver.
+                               //TODO race condition if multiple scope/topic 
combinations are request -> broader the lock?
+                               
celixThreadMutex_lock(&manager->topicReceivers.mutex);
+                               hashMap_put(manager->topicReceivers.map, 
entry->scopeAndTopicKey, entry);
+                               
celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+                       } else {
+                               free(entry->scopeAndTopicKey);
+                               free(entry);
+                               //ignore -> psa unregistered in meantime
                        }
                }
-
-               celixThreadMutex_unlock(&manager->psaListLock);
-               celixThreadMutex_unlock(&manager->subscriptionsLock);
-
-               pubsubEndpoint_destroy(subcmp);
-
        }
 }
 
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, 
const celix_properties_t *props) {
-       pubsub_topology_manager_t *manager = (pubsub_topology_manager_t 
*)handle;
-       pubsub_announce_endpoint_listener_t *disc = svc;
+void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc 
__attribute__((unused)), const celix_properties_t *props, const celix_bundle_t 
*bnd) {
+       pubsub_topology_manager_t *manager = handle;
+       //NOTE local subscriber service unregister
+       //1) Find topic receiver and decrease count
 
-       const char* fwUUID = NULL;
+       const char *topic = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_TOPIC, NULL);
+       const char *scope = celix_properties_get(props, 
PUBSUB_SUBSCRIBER_SCOPE, "default");
 
-       
bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
-       if(fwUUID==NULL){
-               printf("PSD: ERRROR: Cannot retrieve fwUUID.\n");
+       if (topic == NULL) {
                return;
        }
 
-       celixThreadMutex_lock(&manager->publicationsLock);
+       char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, 
topic);
+       celixThreadMutex_lock(&manager->topicReceivers.mutex);
+       pstm_topic_receiver_or_sender_entry_t *entry = 
hashMap_remove(manager->topicReceivers.map, scopeAndTopicKey);
+       if (entry != NULL) {
+               entry->usageCount -= 0;
+       }
+       celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+       free(scopeAndTopicKey);
 
-       celixThreadMutex_lock(&manager->discoveryListLock);
-       long svcId = celix_properties_getAsLong(props, 
OSGI_FRAMEWORK_SERVICE_ID, -1L);
-       hashMap_put(manager->discoveryList, (void*)svcId, NULL);
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-
-       hash_map_iterator_pt iter = 
hashMapIterator_create(manager->publications);
-       while(hashMapIterator_hasNext(iter)){
-               array_list_pt pubEP_list = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) {
-                       pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubEP_list,i);
-                       if( (strcmp(properties_get(pubEP->properties, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0)) {
-                               
disc->announceEndpoint(disc->handle,pubEP->properties);
-                       }
+       //NOTE not waking up psaHandling thread, topic receiver does not need 
to be removed immediately.
+}
+
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, 
void *svc, const celix_properties_t *props __attribute__((unused))) {
+       pubsub_topology_manager_t *manager = (pubsub_topology_manager_t 
*)handle;
+       pubsub_announce_endpoint_listener_t *listener = svc;
+
+       //1) retroactively call announceEndpoint for already existing endpoints 
(manager->announcedEndpoints)
+       //2) Add listener to manager->announceEndpointListeners
+
+       celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(manager->announcedEndpoints.map);
+       while (hashMapIterator_hasNext(&iter)) {
+               celix_array_list_t *endpoints = 
hashMapIterator_nextValue(&iter);
+               for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+                       celix_properties_t *ep = celix_arrayList_get(endpoints, 
i);
+                       listener->announceEndpoint(listener->handle, ep);
                }
        }
-       hashMapIterator_destroy(iter);
-
-       celixThreadMutex_unlock(&manager->publicationsLock);
+       celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 
-       celixThreadMutex_lock(&manager->subscriptionsLock);
-       iter = hashMapIterator_create(manager->subscriptions);
+       celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+       celix_arrayList_add(manager->announceEndpointListeners.list, listener);
+       celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+}
 
-       while(hashMapIterator_hasNext(iter)) {
-               array_list_pt l = 
(array_list_pt)hashMapIterator_nextValue(iter);
-               unsigned int i;
-               for(i=0;i<arrayList_size(l);i++){
-                       pubsub_endpoint_pt subEp = 
(pubsub_endpoint_pt)arrayList_get(l,i);
 
-                       disc->announceEndpoint(disc->handle, subEp->properties);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * 
handle, void *svc, const celix_properties_t *props __attribute__((unused))) {
+       pubsub_topology_manager_t *manager = (pubsub_topology_manager_t 
*)handle;
+       pubsub_announce_endpoint_listener_t *listener = svc;
+
+       //1) Remove listener from manager->announceEndpointListeners
+       //2) call removeEndpoint for already existing endpoints 
(manager->announcedEndpoints)
+
+       celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+       celix_arrayList_remove(manager->announceEndpointListeners.list, 
listener);
+       celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+       celixThreadMutex_lock(&manager->announcedEndpoints.mutex);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(manager->announcedEndpoints.map);
+       while (hashMapIterator_hasNext(&iter)) {
+               celix_array_list_t *endpoints = 
hashMapIterator_nextValue(&iter);
+               for (int i = 0; i < celix_arrayList_size(endpoints); ++i) {
+                       celix_properties_t *ep = celix_arrayList_get(endpoints, 
i);
+                       listener->removeEndpoint(listener->handle, ep);
                }
        }
-       hashMapIterator_destroy(iter);
-       celixThreadMutex_unlock(&manager->subscriptionsLock);
+       celixThreadMutex_unlock(&manager->announcedEndpoints.mutex);
 }
 
 
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, 
const celix_properties_t *props) {
-       pubsub_topology_manager_t *manager = handle;
+static void pstm_setupTopicSenderCallback(void *handle, void *svc) {
+       pstm_topic_receiver_or_sender_entry_t *entry = handle;
+       pubsub_admin_service_t *psa = svc;
+       psa->setupTopicSender(psa->handle, entry->scope, entry->topic, 
entry->selectedSerializerSvcId, &entry->endpoint);
+}
 
-       celixThreadMutex_lock(&manager->discoveryListLock);
+void pubsub_topologyManager_publisherTrackerAdded(void *handle, const 
celix_service_tracker_info_t *info) {
+       pubsub_topology_manager_t *manager = handle;
 
+       //NOTE new local subscriber service register
+       //1) First trying to see if a TopicReceiver already exists for this 
subscriber, if found
+       //2) update the usage count. if not found
+       //3) Try to find a matching psa and create a new TopicReceiver.
 
-       long svcId = celix_properties_getAsLong(props, 
OSGI_FRAMEWORK_SERVICE_ID, -1L);
-       if (hashMap_remove(manager->discoveryList, (void*)svcId)) {
-               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, 
"EndpointListener Removed");
+       //TODO FIXME
+       if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+           logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. 
trackServiceTracker should only trigger for %s. Now triggering on %s", 
PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+           return;
        }
 
-       celixThreadMutex_unlock(&manager->discoveryListLock);
-}
-
-static void tm_callAnnounce(void *handle, void *svc) {
-    pubsub_endpoint_t *pub = handle;
-    pubsub_announce_endpoint_listener_t *listener = svc;
-    listener->announceEndpoint(listener->handle, pub->properties);
-}
-
-void pubsub_topologyManager_publisherTrackerAdded(void *handle, const 
celix_service_tracker_info_t *info) {
-       pubsub_topology_manager_t *manager = handle;
 
-       pubsub_endpoint_pt pub = NULL;
-       celix_status_t status = 
pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub);
-       if (status == CELIX_SUCCESS) {
+       char *topic = NULL;
+       char *scopeFromFilter = NULL;
+       pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, 
&scopeFromFilter);
+       const char *scope = scopeFromFilter == NULL ? "default" : 
scopeFromFilter;
 
-               celixThreadMutex_lock(&manager->publicationsLock);
-               char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pub->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-               array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications, pub_key);
-               if(pub_list_by_topic==NULL){
-                       arrayList_create(&pub_list_by_topic);
-                       
hashMap_put(manager->publications,pub_key,pub_list_by_topic);
-               } else {
-                   free(pub_key);
-        }
-               arrayList_add(pub_list_by_topic,pub);
-
-               celixThreadMutex_unlock(&manager->publicationsLock);
-
-               unsigned int j;
-               double score = 0;
-               double best_score = 0;
-               pubsub_admin_service_pt best_psa = NULL;
-               celixThreadMutex_lock(&manager->psaListLock);
-
-               int size = celix_arrayList_size(manager->psaList);
-               for (j=0; j<size; j++) {
-                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-                       psa->matchEndpoint(psa->admin,pub,&score);
-                       if(score>best_score){ /* We have a new winner! */
-                               best_score = score;
-                               best_psa = psa;
-                       }
+       char *scopeAndTopicKey = NULL;
+       if (topic == NULL) {
+               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING,
+                       "[PSTM] Warning found publisher service request without 
mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC);
+       } else {
+               scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, 
topic);
+               celixThreadMutex_lock(&manager->topicSenders.mutex);
+               pstm_topic_receiver_or_sender_entry_t *entry = 
hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+               if (entry != NULL) {
+                       entry->usageCount += 1;
                }
+               celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+               if (entry == NULL) {
+                       //new topic receiver needed, requesting match with 
current psa
+                       double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                       long serializerSvcId = -1L;
+                       long selectedPsasvcId = -1L;
+
+                       celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+                       hash_map_iterator_t iter = 
hashMapIterator_construct(manager->pubsubadmins.map);
+                       while (hashMapIterator_hasNext(&iter)) {
+                               hash_map_entry_t *entry = 
hashMapIterator_nextEntry(&iter);
+                               long svcId = (long)hashMapEntry_getKey(entry);
+                               pubsub_admin_service_t *psa = 
hashMapEntry_getValue(entry);
+                               double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                               long serSvcId = -1L;
+                               psa->matchPublisher(psa->handle, 
info->bundleId, info->filter, &score, &serSvcId);
+                               if (score > highestScore) {
+                                       highestScore = score;
+                                       serializerSvcId = serSvcId;
+                                       selectedPsasvcId = svcId;
+                               }
+                       }
+                       celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+                       if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) {
+                               entry = calloc(1, sizeof(*entry));
+                               entry->scopeAndTopicKey = scopeAndTopicKey;
+                               entry->usageCount = 1;
+                               entry->selectedPsaSvcId = selectedPsasvcId;
+                               entry->selectedSerializerSvcId = 
serializerSvcId;
+                               entry->topic = topic; //NOTE tmp
+                               entry->scope = scope; //NOTE tmp
+
+                               
celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, 
PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback);
+
+                               if (entry->endpoint != NULL) {
+                                       //note psa->setupTopicSender has 
created the endpoint.
+                                       entry->scope = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
+                                       entry->topic = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
+                                       entry->endpointUUID = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL);
+                               } else {
+                                       free(entry->scopeAndTopicKey);
+                                       free(entry);
+                                       entry = NULL;
+                                       //ignore -> psa unregistered in meantime
+                               }
 
-               if (best_psa != NULL && best_score > 0) {
-                       celix_status_t status = 
best_psa->addPublication(best_psa->admin,pub);
-                       if(status==CELIX_SUCCESS){
-                               
celixThreadMutex_lock(&manager->discoveryListLock);
-                               hash_map_iterator_t iter = 
hashMapIterator_construct(manager->discoveryList);
-                               while(hashMapIterator_hasNext(&iter)) {
-                                   long svcId = 
(long)hashMapIterator_nextKey(&iter);
-                                   
celix_bundleContext_useServiceWithId(manager->context, svcId, 
PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, pub, tm_callAnnounce);
+                               if (entry != NULL) {
+                                       //announce new endpoint through the 
network
+                                       
celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+                                       for (int i = 0; i < 
celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                                               
pubsub_announce_endpoint_listener_t *listener = 
celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                                               
listener->announceEndpoint(listener->handle, entry->endpoint);
+                                       }
+                                       
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+                                       //store topic sender.
+                                       
celixThreadMutex_lock(&manager->topicSenders.mutex);
+                                       hashMap_put(manager->topicSenders.map, 
entry->scopeAndTopicKey, entry);
+                                       
celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+                                       const char *adminType = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+                                       const char *serType = 
celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+                                       logHelper_log(manager->loghelper, 
OSGI_LOGSERVICE_DEBUG,
+                                                                 "[PSTM] 
setting up new TopicSender for scope/topic %s/%s with psa admin type %s and 
serializer %s\n",
+                                                                 entry->scope, 
entry->topic, adminType, serType);
                                }
-                               
celixThreadMutex_unlock(&manager->discoveryListLock);
                        }
+               } else {
+                       free(scopeAndTopicKey);
                }
-
-               celixThreadMutex_unlock(&manager->psaListLock);
-
        }
+       free(scopeFromFilter);
 }
 
-
 void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const 
celix_service_tracker_info_t *info) {
        pubsub_topology_manager_t *manager = handle;
 
-       pubsub_endpoint_pt pubcmp = NULL;
-       if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, 
true, &pubcmp) == CELIX_SUCCESS){
-               unsigned int j,k;
-               celixThreadMutex_lock(&manager->psaListLock);
-               celixThreadMutex_lock(&manager->publicationsLock);
-
-               char *pub_key = 
pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-               array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-               if(pub_list_by_topic!=NULL){
-                       for(j=0;j<arrayList_size(pub_list_by_topic);j++){
-                               pubsub_endpoint_pt pub = 
arrayList_get(pub_list_by_topic,j);
-                               if(pubsubEndpoint_equals(pub,pubcmp)){
-                                       
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                               celix_status_t  status = 
psa->removePublication(psa->admin,pub);
-                                               if(status==CELIX_SUCCESS){ /* 
We found the one that manages this endpoint */
-                                                       
celixThreadMutex_lock(&manager->discoveryListLock);
-                                                       hash_map_iterator_pt 
iter = hashMapIterator_create(manager->discoveryList);
-                                                       
while(hashMapIterator_hasNext(iter)){
-                                                               
service_reference_pt disc_sr = 
(service_reference_pt)hashMapIterator_nextKey(iter);
-                                                               
pubsub_announce_endpoint_listener_t *disc = NULL;
-                                                               
bundleContext_getService(manager->context, disc_sr, (void**) &disc);
-                                                               
disc->removeEndpoint(disc->handle,pub->properties);
-                                                               
bundleContext_ungetService(manager->context, disc_sr, NULL);
-                                                       }
-                                                       
hashMapIterator_destroy(iter);
-                                                       
celixThreadMutex_unlock(&manager->discoveryListLock);
-                                               }
-                                               else if(status ==  
CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not 
handle this endpoint */
-                                                       status = CELIX_SUCCESS;
-                                               }
-                                       }
-                                       //}
-                                       arrayList_remove(pub_list_by_topic,j);
-
-                                       /* If it was the last publisher for 
this topic, tell PSA to close the ZMQ socket and then inform the discovery */
-                                       
if(arrayList_size(pub_list_by_topic)==0){
-                                               
for(k=0;k<arrayList_size(manager->psaList);k++){
-                                                       pubsub_admin_service_pt 
psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k);
-                                                       
psa->closeAllPublications(psa->admin, (char*) properties_get(pub->properties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->properties, 
PUBSUB_ENDPOINT_TOPIC_NAME));
-                                               }
-                                       }
-
-                                       pubsubEndpoint_destroy(pub);
-                               }
+       //NOTE local subscriber service unregister
+       //1) Find topic sender and decrease count
 
-                       }
-               }
+    //TODO FIXME
+    if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) {
+        logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. 
trackServiceTracker should only trigger for %s. Now triggering on %s", 
PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName);
+        return;
+    }
 
-               celixThreadMutex_unlock(&manager->publicationsLock);
-               celixThreadMutex_unlock(&manager->psaListLock);
+       char *topic = NULL;
+       char *scopeFromFilter = NULL;
+       pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, 
&scopeFromFilter);
+       const char *scope = scopeFromFilter == NULL ? "default" : 
scopeFromFilter;
 
-               free(pub_key);
+       if (topic == NULL) {
+               free(scopeFromFilter);
+               return;
+       }
 
-               pubsubEndpoint_destroy(pubcmp);
 
+       char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, 
topic);
+       celixThreadMutex_lock(&manager->topicSenders.mutex);
+       pstm_topic_receiver_or_sender_entry_t *entry = 
hashMap_get(manager->topicSenders.map, scopeAndTopicKey);
+       if (entry != NULL) {
+               entry->usageCount -= 1;
        }
+       celixThreadMutex_unlock(&manager->topicSenders.mutex);
+
+       free(scopeAndTopicKey);
+       free(scopeFromFilter);
+}
+
+static void pstm_addEndpointCallback(void *handle, void *svc) {
+       celix_properties_t *endpoint = handle;
+       pubsub_admin_service_t *psa = svc;
+       psa->addEndpoint(psa->handle, endpoint);
 }
 
-static celix_status_t pubsub_topologyManager_addDiscoveredPublisher(void 
*handle, const celix_properties_t *pubProperties){
+celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint){
        celix_status_t status = CELIX_SUCCESS;
     pubsub_topology_manager_t *manager = handle;
 
-    const char *topic = celix_properties_get(pubProperties, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
-       const char *scope = celix_properties_get(pubProperties, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-       const char *fwUid = celix_properties_get(pubProperties, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL);
-       const char *uuid = celix_properties_get(pubProperties, 
PUBSUB_ENDPOINT_UUID, NULL);
+    const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, 
NULL);
+    assert(uuid != NULL); //discovery should check if endpoint is valid -> 
pubsubEndoint_isValid.
 
+    // 1) See if endpoint is already discovered, if so increase usage count.
+    // 1) If not, find matching psa using the matchEndpoint
+    // 2) if found call addEndpoint of the matching psa
 
-    if (manager->verbose) {
-        printf("PSTM: New publisher discovered for scope/topic %s/%s 
[fwUUID=%s, epUUID=%s]\n",
-               scope, topic, fwUid, uuid);
-    }
-
-
-       celixThreadMutex_lock(&manager->psaListLock);
-       celixThreadMutex_lock(&manager->publicationsLock);
+       if (manager->verbose) {
+               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                                         "PSTM: Discovered endpoint added for 
topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+                                         uuid);
+       }
 
-       char *pub_key = pubsubEndpoint_createScopeTopicKey(scope, topic);
 
-       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-       if(pub_list_by_topic==NULL){
-               arrayList_create(&pub_list_by_topic);
-               
hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic);
+       celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+       pstm_discovered_endpoint_entry_t *entry = 
hashMap_get(manager->discoveredEndpoints.map, uuid);
+       if (entry != NULL) {
+               //already existing endpoint -> increase usage
+               entry->usageCount += 1;
        }
-       free(pub_key);
-
-       /* Shouldn't be any other duplicate, since it's filtered out by the 
discovery */
-       pubsub_endpoint_pt p = NULL;
-       pubsubEndpoint_createFromProperties(pubProperties, &p);
-       arrayList_add(pub_list_by_topic , p);
-
-       unsigned int j;
-       double score = 0;
-       double best_score = 0;
-       pubsub_admin_service_pt best_psa = NULL;
-
-       for(j=0;j<arrayList_size(manager->psaList);j++){
-               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,j);
-               psa->matchEndpoint(psa->admin , p, &score);
-               if (score>best_score) { /* We have a new winner! */
-                       best_score = score;
-                       best_psa = psa;
+       celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+       if (entry == NULL) {
+
+               //new endpoint -> new entry
+               entry = calloc(1, sizeof(*entry));
+               entry->usageCount = 1;
+               entry->endpoint = celix_properties_copy(endpoint);
+               entry->uuid = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_UUID, NULL);
+               entry->selectedPsaSvcId = -1L;
+
+               double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE;
+               long psaSvcId = -1L;
+
+               celixThreadMutex_lock(&manager->pubsubadmins.mutex);
+               hash_map_iterator_t iter = 
hashMapIterator_construct(manager->pubsubadmins.map);
+               while (hashMapIterator_hasNext(&iter)) {
+                       hash_map_entry_t *mapEntry = 
hashMapIterator_nextEntry(&iter);
+                       pubsub_admin_service_t *psa = 
hashMapEntry_getValue(mapEntry);
+                       long svcId = (long) hashMapEntry_getKey(mapEntry);
+                       double score = PUBSUB_ADMIN_NO_MATCH_SCORE;
+                       psa->matchEndpoint(psa->handle, endpoint, &score);
+                       if (score > highestScore) {
+                               highestScore = score;
+                               psaSvcId = svcId;
+                       }
+               }
+               celixThreadMutex_unlock(&manager->pubsubadmins.mutex);
+
+               if (psaSvcId >= 0) {
+                       //psa called outside of mutex, this means the it can 
happen that addEndpointCallback is not called.
+                       //for now this is expected behaviour;
+                       //You need to start the pubsub admin stuff before the 
bundles using pubsub.
+                       celix_bundleContext_useServiceWithId(manager->context, 
psaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                                               
                 (void *) endpoint, pstm_addEndpointCallback);
+               } else {
+                       logHelper_log(manager->loghelper, 
OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid);
                }
-       }
 
-       if(best_psa != NULL && best_score>0) {
-        //TODO FIXME this the same call as used by publisher of service 
trackers. This is confusing.
-        //remote discovered publication can be handle different.
-               best_psa->addPublication(best_psa->admin,p);
-       }
-       else{
-               status = CELIX_ILLEGAL_STATE;
+               entry->selectedPsaSvcId = psaSvcId;
+               celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+               hashMap_put(manager->discoveredEndpoints.map, 
(void*)entry->uuid, entry);
+               celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
        }
 
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_unlock(&manager->psaListLock);
+    return status;
+}
 
-       return status;
+static void pstm_removeEndpointCallback(void *handle, void *svc) {
+       celix_properties_t *endpoint = handle;
+       pubsub_admin_service_t *psa = svc;
+       psa->removeEndpoint(psa->handle, endpoint);
 }
 
-static celix_status_t pubsub_topologyManager_removeDiscoveredPublisher(void 
*handle, const celix_properties_t *props) {
+celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, 
const celix_properties_t *endpoint) {
     pubsub_topology_manager_t *manager = handle;
 
-    if (manager->verbose) {
-        printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, 
epUUID=%s]\n",
-                          celix_properties_get(props, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
-               celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
-                          celix_properties_get(props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
-                          celix_properties_get(props, PUBSUB_ENDPOINT_UUID, 
NULL));
-    }
+       // 1) See if endpoint is already discovered, if so decrease usage count.
+       // 1) If usage count becomes 0, find matching psa using the 
matchEndpoint
+       // 2) if found call disconnectEndpoint of the matching psa
 
-       celixThreadMutex_lock(&manager->psaListLock);
-       celixThreadMutex_lock(&manager->publicationsLock);
-       unsigned int i;
+       const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, 
NULL);
+       assert(uuid != NULL); //discovery should check if endpoint is valid -> 
pubsubEndoint_isValid.
 
-       char *pub_key = 
pubsubEndpoint_createScopeTopicKey(celix_properties_get(props, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), celix_properties_get(props, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
-       array_list_pt pub_list_by_topic = 
hashMap_get(manager->publications,pub_key);
-       if(pub_list_by_topic==NULL){
-               printf("PSTM: ERROR: Cannot find topic for known endpoint 
[%s,%s,%s]. Something is inconsistent.\n",pub_key,celix_properties_get(props, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),celix_properties_get(props, "pubsub.url", 
NULL));
+       if (manager->verbose) {
+               logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                                         "PSTM: Discovered endpoint removed 
for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n",
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, NULL),
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL),
+                                         celix_properties_get(endpoint, 
PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),
+                                         uuid);
        }
-       else{
-
-               pubsub_endpoint_pt p = NULL;
-               bool found = false;
 
-               for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){
-                       p = 
(pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i);
-                       found = pubsubEndpoint_equalsWithProperties(p,props);
+       celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+       pstm_discovered_endpoint_entry_t *entry = 
hashMap_get(manager->discoveredEndpoints.map, uuid);
+       if (entry != NULL) {
+               //already existing endpoint -> decrease usage
+               entry->usageCount-= 1;
+               if (entry->usageCount <= 0) {
+                       hashMap_remove(manager->discoveredEndpoints.map, 
entry->uuid);
+               } else {
+                       entry = NULL; //still used (usage count > 0) -> do 
nothing
+               }
+       }
+       celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+
+       if (entry != NULL) {
+               //note entry is removed from manager->discoveredEndpoints, also 
inform used psa
+               if (entry->selectedPsaSvcId >= 0) {
+                       //note that it is possible that the psa is already 
gone, in that case the call is also not needed anymore.
+                       celix_bundleContext_useServiceWithId(manager->context, 
entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                                               
                 (void *) endpoint, pstm_removeEndpointCallback);
+               } else {
+                       logHelper_log(manager->loghelper, 
OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid);
                }
+               celix_properties_destroy(entry->endpoint);
+               free(entry);
+       }
 
-               if(found && p !=NULL){
 
-                       for(i=0;i<arrayList_size(manager->psaList);i++){
-                               pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                               /* No problem with invoking removal on all 
psa's, only the one that manage this topic will do something */
-                               psa->removePublication(psa->admin,p);
-                       }
+       return CELIX_SUCCESS;
+}
 
-                       arrayList_removeElement(pub_list_by_topic,p);
 
-                       /* If it was the last publisher for this topic, tell 
PSA to close the ZMQ socket */
-                       if(arrayList_size(pub_list_by_topic)==0){
+static void pstm_teardownTopicSenderCallback(void *handle, void *svc) {
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
 
-                               for(i=0;i<arrayList_size(manager->psaList);i++){
-                                       pubsub_admin_service_pt psa = 
(pubsub_admin_service_pt)arrayList_get(manager->psaList,i);
-                                       psa->closeAllPublications(psa->admin, 
(char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), 
(char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL));
-                               }
-                       }
+static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) {
+    celixThreadMutex_lock(&manager->topicSenders.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(manager->topicSenders.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+
+        if (entry != NULL && entry->usageCount <= 0) {
+            hashMapIterator_remove(&iter);
+            if (manager->verbose) {
+                const char *adminType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+                const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                              "[PSTM] Tearing down TopicSender for scope/topic 
%s/%s with psa admin type %s and serializer %s\n",
+                              entry->scope, entry->topic, adminType, serType);
+            }
+
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int i = 0; i < 
celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                pubsub_announce_endpoint_listener_t *listener;
+                listener = 
celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->removeEndpoint(listener->handle, entry->endpoint);
+            }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+            celix_bundleContext_useServiceWithId(manager->context, 
entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME,
+                                                 entry, 
pstm_teardownTopicSenderCallback);
+
+
+            //cleanup entry
+            free(entry->scopeAndTopicKey);
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicSenders.mutex);
+}
 
-                       pubsubEndpoint_destroy(p);
-               }
+static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) {
+    pstm_topic_receiver_or_sender_entry_t *entry = handle;
+    pubsub_admin_service_t *psa = svc;
+    psa->teardownTopicSender(psa->handle, entry->scope, entry->topic);
+}
+
+static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) {
+    celixThreadMutex_lock(&manager->topicReceivers.mutex);
+    hash_map_iterator_t iter = 
hashMapIterator_construct(manager->topicReceivers.map);
+    while (hashMapIterator_hasNext(&iter)) {
+        pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+        if (entry != NULL && entry->usageCount <= 0) {
+            hashMapIterator_remove(&iter);
+
+            if (manager->verbose) {
+                const char *adminType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+                const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+                logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG,
+                              "[PSTM] Tearing down TopicReceiver for 
scope/topic %s/%s with psa admin type %s and serializer %s\n",
+                              entry->scope, entry->topic, adminType, serType);
+            }
+
+            celix_bundleContext_useServiceWithId(manager->context, 
entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, 
pstm_teardownTopicReceiverCallback);
+            celixThreadMutex_lock(&manager->announceEndpointListeners.mutex);
+            for (int i = 0; i < 
celix_arrayList_size(manager->announceEndpointListeners.list); ++i) {
+                pubsub_announce_endpoint_listener_t *listener = 
celix_arrayList_get(manager->announceEndpointListeners.list, i);
+                listener->removeEndpoint(listener->handle, entry->endpoint);
+            }
+            celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
+
+            //cleanup entry
+            free(entry->scopeAndTopicKey);
+            celix_properties_destroy(entry->endpoint);
+            free(entry);
+        }
+    }
+    celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+}
 
+static void* pstm_psaHandlingThread(void *data) {
+    pubsub_topology_manager_t *manager = data;
 
-       }
-       free(pub_key);
-       celixThreadMutex_unlock(&manager->publicationsLock);
-       celixThreadMutex_unlock(&manager->psaListLock);
+    celixThreadMutex_lock(&manager->psaHandling.mutex);
+    bool running = manager->psaHandling.running;
+    celixThreadMutex_unlock(&manager->psaHandling.mutex);
 
+    while (running) {
+        pstm_teardownTopicSenders(manager);
+        pstm_teardownTopicReceivers(manager);
 
-       return CELIX_SUCCESS;
+        celixThreadMutex_lock(&manager->psaHandling.mutex);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, 
&manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L);
+        running = manager->psaHandling.running;
+        celixThreadMutex_unlock(&manager->psaHandling.mutex);
+    }
+    return NULL;
 }
 
-celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, 
const celix_properties_t *properties) {
-       const char *type = celix_properties_get(properties, 
PUBSUB_ENDPOINT_TYPE, NULL);
-       if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-               return pubsub_topologyManager_addDiscoveredPublisher(handle, 
properties);
-       } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
-               //nop //TODO add subscription to pubsub admins
-       } else {
-               fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint 
type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
-       }
-       return CELIX_SUCCESS;
-}
 
-celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, 
const celix_properties_t *properties) {
-       const char *type = celix_properties_get(properties, 
PUBSUB_ENDPOINT_TYPE, NULL);
-       if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, 
strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) {
-               pubsub_topologyManager_removeDiscoveredPublisher(handle, 
properties);
-       } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, 
type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) {
-               //nop //TODO remove subscription from pubsub admins
-       } else {
-               fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint 
type (key: %s)\n", PUBSUB_ENDPOINT_TYPE);
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * 
commandLine __attribute__((unused)), FILE *os, FILE *errorStream 
__attribute__((unused))) {
+       pubsub_topology_manager_t *manager = handle;
+       //TODO add support for searching based on scope and topic
+
+       fprintf(os, "\n");
+
+       fprintf(os, "Discovered Endpoints: \n");
+       celixThreadMutex_lock(&manager->discoveredEndpoints.mutex);
+       hash_map_iterator_t iter = 
hashMapIterator_construct(manager->discoveredEndpoints.map);
+       while (hashMapIterator_hasNext(&iter)) {
+               pstm_discovered_endpoint_entry_t *discovered = 
hashMapIterator_nextValue(&iter);
+               const char *scope = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!");
+               const char *topic = celix_properties_get(discovered->endpoint, 
PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!");
+               const char *adminType = 
celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, 
"!Error!");
+               const char *serType = 
celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, 
"!Error!");
+               fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid);
+               fprintf(os, "   |- scope       = %s\n", scope);
+               fprintf(os, "   |- topic       = %s\n", topic);
+               fprintf(os, "   |- admin type  = %s\n", adminType);
+               fprintf(os, "   |- serializer  = %s\n", serType);
+               if (manager->verbose) {
+                       fprintf(os, "   |- psa svc id  = %li\n", 
discovered->selectedPsaSvcId);
+                       fprintf(os, "   |- usage count = %i\n", 
discovered->usageCount);
+               }
+       }
+       celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex);
+       fprintf(os,"\n");
+
+
+       fprintf(os, "Active Topic Senders:\n");
+       celixThreadMutex_lock(&manager->topicSenders.mutex);
+       iter = hashMapIterator_construct(manager->topicSenders.map);
+       while (hashMapIterator_hasNext(&iter)) {
+               pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+               const char *adminType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+               const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+               fprintf(os, "|- Topic Sender for endpoint %s:\n", 
entry->endpointUUID);
+               fprintf(os, "   |- scope       = %s\n", entry->scope);
+               fprintf(os, "   |- topic       = %s\n", entry->topic);
+               fprintf(os, "   |- admin type  = %s\n", adminType);
+               fprintf(os, "   |- serializer  = %s\n", serType);
+               if (manager->verbose) {
+                       fprintf(os, "   |- psa svc id  = %li\n", 
entry->selectedPsaSvcId);
+                       fprintf(os, "   |- ser svc id  = %li\n", 
entry->selectedSerializerSvcId);
+                       fprintf(os, "   |- usage count = %i\n", 
entry->usageCount);
+               }
        }
+       celixThreadMutex_unlock(&manager->topicSenders.mutex);
+       fprintf(os,"\n");
+
+       fprintf(os, "Active Topic Receivers:\n");
+       celixThreadMutex_lock(&manager->topicReceivers.mutex);
+       iter = hashMapIterator_construct(manager->topicReceivers.map);
+       while (hashMapIterator_hasNext(&iter)) {
+               pstm_topic_receiver_or_sender_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+               const char *adminType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!");
+               const char *serType = celix_properties_get(entry->endpoint, 
PUBSUB_ENDPOINT_SERIALIZER, "!Error!");
+               fprintf(os, "|- Topic Receiver for endpoint %s:\n", 
entry->endpointUUID);
+               fprintf(os, "   |- scope       = %s\n", entry->scope);
+               fprintf(os, "   |- topic       = %s\n", entry->topic);
+               fprintf(os, "   |- admin type  = %s\n", adminType);
+               fprintf(os, "   |- serializer  = %s\n", serType);
+               if (manager->verbose) {
+                       fprintf(os, "    |- psa svc id  = %li\n", 
entry->selectedPsaSvcId);
+                       fprintf(os, "    |- ser svc id  = %li\n", 
entry->selectedSerializerSvcId);
+                       fprintf(os, "    |- usage count = %i\n", 
entry->usageCount);
+               }
+       }
+       celixThreadMutex_unlock(&manager->topicReceivers.mutex);
+       fprintf(os,"\n");
+
        return CELIX_SUCCESS;
 }
-
-

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
----------------------------------------------------------------------
diff --git 
a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h 
b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index dda84a0..105b797 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -36,31 +36,68 @@
 #define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE                false
 
 
-struct pubsub_topology_manager {
+typedef struct pubsub_topology_manager {
        bundle_context_pt context;
 
-       celix_thread_mutex_t psaListLock;
-       array_list_pt psaList;
-
-       celix_thread_mutex_t discoveryListLock;
-       hash_map_pt discoveryList; //<svcId,NULL>
-
-       celix_thread_mutex_t publicationsLock;
-       hash_map_pt publications; //<topic(string),list<pubsub_ep>>
-
-       celix_thread_mutex_t subscriptionsLock;
-       hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>>
-
-       command_service_t shellCmdService;
-       service_registration_pt  shellCmdReg;
-
+       struct {
+               celix_thread_mutex_t mutex;
+               hash_map_t *map; //key = svcId, value = pubsub_admin_t*
+       } pubsubadmins;
+
+       struct {
+               celix_thread_mutex_t mutex;
+               hash_map_t *map; //key = psa svc id, value = 
list<celix_properties_t /*endpoint*/>
+       } announcedEndpoints;
+
+       struct {
+               celix_thread_mutex_t mutex;
+               hash_map_t *map; //key = uuid , value = 
pstm_discovered_endpoint_entry_t
+       } discoveredEndpoints;
+
+       struct {
+               celix_thread_mutex_t mutex;
+               hash_map_t *map; //key = scope/topic key, value = 
pstm_topic_receiver_or_sender_entry_t*
+       } topicReceivers;
+
+       struct {
+               celix_thread_mutex_t mutex;
+               hash_map_t *map; //key = scope/topic key, value = 
pstm_topic_receiver_or_sender_entry_t*
+       } topicSenders;
+
+       struct {
+               celix_thread_mutex_t mutex;
+               celix_array_list_t *list; 
//<pubsub_announce_endpoint_listener_t*>
+       } announceEndpointListeners;
+
+       struct {
+               celix_thread_t thread;
+               celix_thread_mutex_t mutex; //protect running and condition
+               celix_thread_cond_t cond;
+               bool running;
+       } psaHandling;
 
        log_helper_pt loghelper;
 
        bool verbose;
-};
-
-typedef struct pubsub_topology_manager pubsub_topology_manager_t;
+} pubsub_topology_manager_t;
+
+typedef struct pstm_discovered_endpoint_entry {
+       const char *uuid;
+       long selectedPsaSvcId;
+       int usageCount; //note that discovered endpoints can be found multiple 
times by different pubsub discovery components
+       celix_properties_t *endpoint;
+} pstm_discovered_endpoint_entry_t;
+
+typedef struct pstm_topic_receiver_or_sender_entry {
+       char *scopeAndTopicKey; //key of the combined value of the scope and 
topic
+       celix_properties_t *endpoint;
+       const char *topic;
+       const char *scope;
+       const char *endpointUUID;
+       int usageCount; //nr of subscriber service for the topic receiver 
(matching scope & topic)
+       long selectedPsaSvcId;
+       long selectedSerializerSvcId;
+} pstm_topic_receiver_or_sender_entry_t;
 
 celix_status_t pubsub_topologyManager_create(bundle_context_pt context, 
log_helper_pt logHelper, pubsub_topology_manager_t **manager);
 celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t 
*manager);
@@ -69,8 +106,8 @@ celix_status_t 
pubsub_topologyManager_closeImports(pubsub_topology_manager_t *ma
 void pubsub_topologyManager_psaAdded(void *handle, void *svc, const 
celix_properties_t *props);
 void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const 
celix_properties_t *props);
 
-void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, 
const celix_properties_t *props);
-void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, 
const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, 
void *svc, const celix_properties_t *props);
+void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * 
handle, void *svc, const celix_properties_t *props);
 
 void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *bnd);
 void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const 
celix_properties_t *props, const celix_bundle_t *bnd);
@@ -81,4 +118,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void 
*handle, const celix_se
 celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, 
const celix_properties_t *properties);
 celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, 
const celix_properties_t *properties);
 
+celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * 
commandLine, FILE *outStream, FILE *errorStream);
+
 #endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_api.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_api.h 
b/libs/framework/include/celix_api.h
index 5129a65..b941a78 100644
--- a/libs/framework/include/celix_api.h
+++ b/libs/framework/include/celix_api.h
@@ -21,32 +21,26 @@
 #define CELIX_CELIX_API_H_
 
 #include "properties.h"
-
 #include "array_list.h"
-#include "celix_array_list.h"
-
 #include "constants.h"
+#include "bundle.h"
+#include "bundle_context.h"
+#include "framework.h"
 
+#include "celix_properties.h"
+#include "celix_array_list.h"
+//#include "celix_constants.h"
 #include "celix_utils_api.h"
-
-#include "bundle.h"
 #include "celix_bundle.h"
-
-#include "bundle_context.h"
 #include "celix_bundle_context.h"
 
-#include "service_registration.h"
-#include "service_factory.h"
-#include "service_reference.h"
-#include "service_tracker.h"
-#include "service_tracker_customizer.h"
-#include "listener_hook_service.h"
-
-#include "framework.h"
+#include "celix_framework.h"
 #include "celix_framework_factory.h"
 #include "celix_launcher.h"
 
 #include "dm_dependency_manager.h"
 #include "dm_service_dependency.h"
 
+#include "celix_bundle_activator.h"
+
 #endif //CELIX_CELIX_API_H_

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_bundle_context.h
----------------------------------------------------------------------
diff --git a/libs/framework/include/celix_bundle_context.h 
b/libs/framework/include/celix_bundle_context.h
index 5bb6faa..76af9a5 100644
--- a/libs/framework/include/celix_bundle_context.h
+++ b/libs/framework/include/celix_bundle_context.h
@@ -357,6 +357,7 @@ typedef struct celix_service_tracker_options {
     .filter.versionRange = NULL, \
     .filter.filter = NULL, \
     .filter.serviceLanguage = NULL, \
+    .filter.ignoreServiceLanguage = false, \
     .callbackHandle = NULL, \
     .set = NULL, \
     .add = NULL, \
@@ -797,6 +798,16 @@ const char* 
celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const c
 long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const 
char *key, long defaultValue);
 
 /**
+ * Gets the config property as converts it to double. If the property is not a 
valid double, the defaultValue will be returned.
+ * The rest of the behaviour is the same as celix_bundleContext_getProperty.
+
+ * @param key The key of the property to receive.
+ * @param defaultVal The default value to use if the property is not found.
+ * @return The property value for the provided key or the provided 
defaultValue is the key is not found.
+ */
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, 
const char *key, double defaultValue);
+
+/**
  * Gets the config property as converts it to bool. If the property is not a 
valid bool, the defaultValue will be returned.
  * The rest of the behaviour is the same as celix_bundleContext_getProperty.
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/bundle_context.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/bundle_context.c 
b/libs/framework/src/bundle_context.c
index b8b1552..1e86f2a 100644
--- a/libs/framework/src/bundle_context.c
+++ b/libs/framework/src/bundle_context.c
@@ -1038,6 +1038,20 @@ long 
celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const ch
     return result;
 }
 
+double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, 
const char *key, double defaultValue) {
+    double result = defaultValue;
+    const char *val = celix_bundleContext_getProperty(ctx, key, NULL);
+    if (val != NULL) {
+        char *enptr = NULL;
+        errno = 0;
+        double r = strtod(val, &enptr);
+        if (enptr != val && errno == 0) {
+            result = r;
+        }
+    }
+    return result;
+}
+
 
 bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const 
char *key, bool defaultValue) {
     bool result = defaultValue;

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/framework.c
----------------------------------------------------------------------
diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c
index 4059e70..b0d505d 100644
--- a/libs/framework/src/framework.c
+++ b/libs/framework/src/framework.c
@@ -2822,7 +2822,7 @@ void celix_framework_useBundle(framework_t *fw, bool 
onlyActive, long bundleId,
         bundle_t *bnd = framework_getBundleById(fw, bundleId);
         if (bnd != NULL) {
             celix_bundle_state_e bndState = celix_bundle_getState(bnd);
-            if (onlyActive && bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE) {
+            if (onlyActive && (bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE || 
bndState == OSGI_FRAMEWORK_BUNDLE_STARTING)) {
                 use(callbackHandle, bnd);
             } else if (!onlyActive) {
                 use(callbackHandle, bnd);

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_properties.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_properties.h 
b/libs/utils/include/celix_properties.h
index e325222..f6e871b 100644
--- a/libs/utils/include/celix_properties.h
+++ b/libs/utils/include/celix_properties.h
@@ -60,12 +60,16 @@ void celix_properties_unset(celix_properties_t *properties, 
const char *key);
 celix_properties_t* celix_properties_copy(const celix_properties_t 
*properties);
 
 long celix_properties_getAsLong(const celix_properties_t *props, const char 
*key, long defaultValue);
-
 void celix_properties_setLong(celix_properties_t *props, const char *key, long 
value);
 
 bool celix_properties_getAsBool(celix_properties_t *props, const char *key, 
bool defaultValue);
 void celix_properties_setBool(celix_properties_t *props, const char *key, bool 
val);
 
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, 
double val);
+double celix_properties_getAsDouble(const celix_properties_t *props, const 
char *key, double defaultValue);
+
+
 #define CELIX_PROPERTIES_FOR_EACH(props, key) \
     for(hash_map_iterator_t iter = hashMapIterator_construct(props); \
         hashMapIterator_hasNext(&iter), (key) = (const 
char*)hashMapIterator_nextKey(&iter);)

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_threads.h
----------------------------------------------------------------------
diff --git a/libs/utils/include/celix_threads.h 
b/libs/utils/include/celix_threads.h
index a9a3049..bf6e1f6 100644
--- a/libs/utils/include/celix_threads.h
+++ b/libs/utils/include/celix_threads.h
@@ -54,6 +54,11 @@ static const celix_thread_t celix_thread_default = {0, 0};
 celix_status_t
 celixThread_create(celix_thread_t *new_thread, celix_thread_attr_t *attr, 
celix_thread_start_t func, void *data);
 
+/**
+ * If supported by the platform sets the name of the thread.
+ */
+void celixThread_setName(celix_thread_t *thread, const char *threadName);
+
 void celixThread_exit(void *exitStatus);
 
 celix_status_t celixThread_detach(celix_thread_t thread);
@@ -123,7 +128,7 @@ celix_status_t 
celixThreadCondition_destroy(celix_thread_cond_t *condition);
 
 celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, 
celix_thread_mutex_t *mutex);
 
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, 
celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t 
*cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds);
 
 celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond);
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/celix_threads.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c
index d8a8091..7bfc37d 100644
--- a/libs/utils/src/celix_threads.c
+++ b/libs/utils/src/celix_threads.c
@@ -24,6 +24,7 @@
  *  \copyright  Apache License, Version 2.0
  */
 #include <stdlib.h>
+#include <sys/time.h>
 #include "signal.h"
 #include "celix_threads.h"
 
@@ -41,6 +42,16 @@ celix_status_t celixThread_create(celix_thread_t 
*new_thread, celix_thread_attr_
        return status;
 }
 
+#ifdef _GNU_SOURCE
+void celixThread_setName(celix_thread_t *thread, const char *threadName) {
+       pthread_setname_np(thread->thread, threadName);
+}
+#else
+void celixThread_setName(celix_thread_t *thread __attribute__((unused)), const 
char *threadName  __attribute__((unused))); {
+       //nop
+}
+#endif
+
 // Returns void, since pthread_exit does exit the thread and never returns.
 void celixThread_exit(void *exitStatus) {
     pthread_exit(exitStatus);
@@ -143,10 +154,11 @@ celix_status_t 
celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread
     return pthread_cond_wait(cond, mutex);
 }
 
-celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, 
celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
+celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t 
*cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) {
        struct timespec time;
-       time.tv_sec = seconds;
-       time.tv_nsec = nanoseconds;
+       clock_gettime(CLOCK_REALTIME, &time);
+       time.tv_sec += seconds;
+       time.tv_nsec += nanoseconds;
        return pthread_cond_timedwait(cond, mutex, &time);
 }
 

http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/properties.c
----------------------------------------------------------------------
diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c
index 480a056..191dfd8 100644
--- a/libs/utils/src/properties.c
+++ b/libs/utils/src/properties.c
@@ -394,13 +394,37 @@ long celix_properties_getAsLong(const celix_properties_t 
*props, const char *key
 }
 
 void celix_properties_setLong(celix_properties_t *props, const char *key, long 
value) {
-       char buf[32]; //should be enough to store long long int
-       int writen = snprintf(buf, 32, "%li", value);
-       if (writen <= 31) {
-               celix_properties_set(props, key, buf);
-       } else {
-               fprintf(stderr,"buf to small for value '%li'\n", value);
+    char buf[32]; //should be enough to store long long int
+    int writen = snprintf(buf, 32, "%li", value);
+    if (writen <= 31) {
+        celix_properties_set(props, key, buf);
+    } else {
+        fprintf(stderr,"buf to small for value '%li'\n", value);
+    }
+}
+
+double celix_properties_getAsDouble(const celix_properties_t *props, const 
char *key, double defaultValue) {
+       double result = defaultValue;
+       const char *val = celix_properties_get(props, key, NULL);
+       if (val != NULL) {
+               char *enptr = NULL;
+               errno = 0;
+               double r = strtod(val, &enptr);
+               if (enptr != val && errno == 0) {
+                       result = r;
+               }
        }
+       return result;
+}
+
+void celix_properties_setDouble(celix_properties_t *props, const char *key, 
double val) {
+    char buf[32]; //should be enough to store long long int
+    int writen = snprintf(buf, 32, "%f", val);
+    if (writen <= 31) {
+        celix_properties_set(props, key, buf);
+    } else {
+        fprintf(stderr,"buf to small for value '%f'\n", val);
+    }
 }
 
 bool celix_properties_getAsBool(celix_properties_t *props, const char *key, 
bool defaultValue) {

Reply via email to