Repository: celix Updated Branches: refs/heads/feature/CELIX-454-pubsub-disc e30a70f72 -> 69596cfdf
http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 834e3a8..85c67e9 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@ -16,24 +16,19 @@ *specific language governing permissions and limitations *under the License. */ -/* - * pubsub_topology_manager.c - * - * \date Sep 29, 2011 - * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> - * \copyright Apache License, Version 2.0 - */ + #include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdbool.h> #include <celix_api.h> +#include <pubsub_utils.h> +#include <assert.h> #include "hash_map.h" -#include "array_list.h" -#include "bundle_context.h" +#include "celix_array_list.h" +#include "celix_bundle_context.h" #include "constants.h" -#include "listener_hook_service.h" #include "utils.h" #include "log_service.h" #include "log_helper.h" @@ -42,697 +37,761 @@ #include "pubsub_topology_manager.h" #include "pubsub_admin.h" -static void print_endpoint_info(hash_map_pt endpoints, FILE *outStream) { - for(hash_map_iterator_t iter = hashMapIterator_construct(endpoints); hashMapIterator_hasNext(&iter);) { - const char* key = (const char*)hashMapIterator_nextKey(&iter); - fprintf(outStream, " Topic=%s\n", key); - array_list_pt ep_list = hashMap_get(endpoints, key); - for(unsigned int i = 0; i < arrayList_size(ep_list); ++i) { - pubsub_endpoint_pt ep = arrayList_get(ep_list, i); - fprintf(outStream, " Endpoint %d\n", i); - fprintf(outStream, " Endpoint properties\n"); - const char *propKey; - if(ep->properties) { - PROPERTIES_FOR_EACH(ep->properties, propKey) { - fprintf(outStream, " %s => %s\n", propKey, celix_properties_get(ep->properties, propKey, NULL)); - } - } - } - } +#define PSTM_CLEANUP_SLEEPTIME_IN_SECONDS 5L -} +static void* pstm_psaHandlingThread(void *data); -static celix_status_t shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream) { - pubsub_topology_manager_t *manager = (pubsub_topology_manager_t*) handle; - if (manager->publications && !hashMap_isEmpty(manager->publications)) { - fprintf(outStream, "Publications:\n"); - print_endpoint_info(manager->publications, outStream); - } - if (manager->subscriptions && !hashMap_isEmpty(manager->subscriptions)) { - fprintf(outStream, "Subscriptions:\n"); - print_endpoint_info(manager->subscriptions, outStream); - } - return CELIX_SUCCESS; -} - -celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager) { +celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **out) { celix_status_t status = CELIX_SUCCESS; - *manager = calloc(1, sizeof(**manager)); - if (!*manager) { + pubsub_topology_manager_t *manager = calloc(1, sizeof(*manager)); + if (manager == NULL) { + *out = NULL; return CELIX_ENOMEM; + } else { + *out = manager; } - (*manager)->context = context; + manager->context = context; celix_thread_mutexattr_t psaAttr; celixThreadMutexAttr_create(&psaAttr); celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); - status |= celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); + status |= celixThreadMutex_create(&manager->pubsubadmins.mutex, &psaAttr); celixThreadMutexAttr_destroy(&psaAttr); - status |= celixThreadMutex_create(&(*manager)->publicationsLock, NULL); - status |= celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); - status |= celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); + status |= celixThreadMutex_create(&manager->announcedEndpoints.mutex, NULL); + status |= celixThreadMutex_create(&manager->discoveredEndpoints.mutex, NULL); + status |= celixThreadMutex_create(&manager->announceEndpointListeners.mutex, NULL); + status |= celixThreadMutex_create(&manager->topicReceivers.mutex, NULL); + status |= celixThreadMutex_create(&manager->topicSenders.mutex, NULL); + status |= celixThreadMutex_create(&manager->psaHandling.mutex, NULL); - arrayList_create(&(*manager)->psaList); + status |= celixThreadCondition_init(&manager->psaHandling.cond, NULL); - (*manager)->discoveryList = hashMap_create(NULL, NULL, NULL, NULL); - (*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + manager->announcedEndpoints.map = hashMap_create(NULL, NULL, NULL, NULL); + manager->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + manager->announceEndpointListeners.list = celix_arrayList_create(); + manager->pubsubadmins.map = hashMap_create(NULL, NULL, NULL, NULL); + manager->topicReceivers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + manager->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); - (*manager)->loghelper = logHelper; - (*manager)->shellCmdService.handle = *manager; - (*manager)->shellCmdService.executeCommand = shellCommand; + manager->loghelper = logHelper; + manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE); - (*manager)->verbose = PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE; - const char *verboseStr = NULL; - bundleContext_getProperty(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, &verboseStr); - if (verboseStr != NULL) { - (*manager)->verbose = strncasecmp("true", verboseStr, strlen("true")) == 0; - } + manager->psaHandling.running = true; + celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager); + celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager"); - properties_pt shellProps = properties_create(); - properties_set(shellProps, OSGI_SHELL_COMMAND_NAME, "ps_info"); - properties_set(shellProps, OSGI_SHELL_COMMAND_USAGE, "ps_info"); - properties_set(shellProps, OSGI_SHELL_COMMAND_DESCRIPTION, "ps_info: Overview of PubSub"); - bundleContext_registerService(context, OSGI_SHELL_COMMAND_SERVICE_NAME, &((*manager)->shellCmdService), shellProps, &((*manager)->shellCmdReg)); return status; } celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager) { celix_status_t status = CELIX_SUCCESS; - celixThreadMutex_lock(&manager->discoveryListLock); - hashMap_destroy(manager->discoveryList, false, false); - celixThreadMutex_unlock(&manager->discoveryListLock); - celixThreadMutex_destroy(&manager->discoveryListLock); - - celixThreadMutex_lock(&manager->psaListLock); - arrayList_destroy(manager->psaList); - celixThreadMutex_unlock(&manager->psaListLock); - celixThreadMutex_destroy(&manager->psaListLock); - - celixThreadMutex_lock(&manager->publicationsLock); - hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(pubit)){ - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit); - unsigned int i; - for(i=0;i<arrayList_size(l);i++){ - pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); - } - arrayList_destroy(l); - } - hashMapIterator_destroy(pubit); - hashMap_destroy(manager->publications, true, false); - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_destroy(&manager->publicationsLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); - while(hashMapIterator_hasNext(subit)){ - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit); - unsigned int i; - for(i=0;i<arrayList_size(l);i++){ - pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); - } - arrayList_destroy(l); - } - hashMapIterator_destroy(subit); - hashMap_destroy(manager->subscriptions, true, false); - celixThreadMutex_unlock(&manager->subscriptionsLock); - celixThreadMutex_destroy(&manager->subscriptionsLock); - serviceRegistration_unregister(manager->shellCmdReg); - free(manager); + celixThreadMutex_lock(&manager->psaHandling.mutex); + manager->psaHandling.running = false; + celixThreadCondition_broadcast(&manager->psaHandling.cond); + celixThreadMutex_unlock(&manager->psaHandling.mutex); + celixThread_join(manager->psaHandling.thread, NULL); - return status; -} -void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { - pubsub_topology_manager_t *manager = handle; - unsigned int i; + celixThreadMutex_lock(&manager->announcedEndpoints.mutex); + hashMap_destroy(manager->announcedEndpoints.map, false, false); + celixThreadMutex_unlock(&manager->announcedEndpoints.mutex); + celixThreadMutex_destroy(&manager->announcedEndpoints.mutex); - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc; - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA"); + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hashMap_destroy(manager->pubsubadmins.map, false, false); + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + celixThreadMutex_destroy(&manager->pubsubadmins.mutex); - celixThreadMutex_lock(&manager->psaListLock); - arrayList_add(manager->psaList, psa); - celixThreadMutex_unlock(&manager->psaListLock); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + hashMap_destroy(manager->discoveredEndpoints.map, true, false); + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + celixThreadMutex_destroy(&manager->discoveredEndpoints.mutex); - // Add already detected subscriptions to new PSA - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions); - - //TODO FIXME no matching used, should only add unmatched subscribers ? - //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA - while (hashMapIterator_hasNext(subscriptionsIterator)) { - array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator); - for(i=0;i<arrayList_size(sub_ep_list);i++){ - psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i)); - } - } + free(manager); - hashMapIterator_destroy(subscriptionsIterator); + return status; +} - celixThreadMutex_unlock(&manager->subscriptionsLock); +void pubsub_topologyManager_psaAdded(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = handle; + pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc; - // Add already detected publications to new PSA - celixThreadMutex_lock(&manager->publicationsLock); - hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications); + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Added PSA"); - //TODO FIXME no matching used, should only add unmatched publications ? - //NOTE this is a bug which occurs when psa are started after bundles that uses the PSA - while (hashMapIterator_hasNext(publicationsIterator)) { - array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator); - for(i=0;i<arrayList_size(pub_ep_list);i++){ - psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i)); - } + if (svcId >= 0) { + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hashMap_put(manager->pubsubadmins.map, (void*)svcId, psa); + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); } - hashMapIterator_destroy(publicationsIterator); - - celixThreadMutex_unlock(&manager->publicationsLock); + /* NOTE for now it assumed PSA / PST and PSD are started before subscribers/publisher + * so no retroactively adding subscribers + * + * TODO future extension? + */ } -void pubsub_topologyManager_psaRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { - celix_status_t status = CELIX_SUCCESS; +void pubsub_topologyManager_psaRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props) { pubsub_topology_manager_t *manager = handle; + //pubsub_admin_service_t *psa = (pubsub_admin_service_t*) svc; + long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) svc; - - /* Deactivate all publications */ - celixThreadMutex_lock(&manager->publicationsLock); - - hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(pubit)){ - hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit); - char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry); - // Extract scope/topic name from key - char scope[MAX_SCOPE_LEN]; - char topic[MAX_TOPIC_LEN]; - sscanf(scope_topic_key, "%[^:]:%s", scope, topic ); - array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry); - - status = psa->closeAllPublications(psa->admin,scope,topic); - - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - pubsub_announce_endpoint_listener_t *disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - const char* fwUUID = NULL; - bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - unsigned int i; - for(i=0;i<arrayList_size(pubEP_list);i++){ - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if(strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0){ - disc->removeEndpoint(disc->handle,pubEP->properties); - } - } - bundleContext_ungetService(manager->context, disc_sr, NULL); + /* de-announce all publications */ + celixThreadMutex_lock(&manager->announcedEndpoints.mutex); + celix_array_list_t *endpointsList = hashMap_remove(manager->announcedEndpoints.map, (void*)svcId); + celixThreadMutex_unlock(&manager->announcedEndpoints.mutex); + + if (endpointsList != NULL) { + for (int i = 0; i < celix_arrayList_size(endpointsList); ++i) { + celix_properties_t *endpoint = celix_arrayList_get(endpointsList, i); + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int j = 0; j < celix_arrayList_size(manager->announceEndpointListeners.list); ++j) { + pubsub_announce_endpoint_listener_t *listener; + listener = celix_arrayList_get(manager->announceEndpointListeners.list, j); + listener->removeEndpoint(listener->handle, endpoint); } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + celix_properties_destroy(endpoint); } + celix_arrayList_destroy(endpointsList); } - hashMapIterator_destroy(pubit); - - celixThreadMutex_unlock(&manager->publicationsLock); - - /* Deactivate all subscriptions */ - celixThreadMutex_lock(&manager->subscriptionsLock); - hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); - while(hashMapIterator_hasNext(subit)){ - // TODO do some error checking - char* scope_topic = (char*)hashMapIterator_nextKey(subit); - char scope[MAX_TOPIC_LEN]; - char topic[MAX_TOPIC_LEN]; - memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char)); - memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char)); - sscanf(scope_topic, "%[^:]:%s", scope, topic ); - status += psa->closeAllSubscriptions(psa->admin,scope, topic); - } - hashMapIterator_destroy(subit); - celixThreadMutex_unlock(&manager->subscriptionsLock); - celixThreadMutex_lock(&manager->psaListLock); - arrayList_removeElement(manager->psaList, psa); - celixThreadMutex_unlock(&manager->psaListLock); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "PSTM: Removed PSA"); +} - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA"); +static void pstm_setupTopicReceiverCallback(void *handle, void *svc) { + pstm_topic_receiver_or_sender_entry_t *entry = handle; + pubsub_admin_service_t *psa = svc; + psa->setupTopicReciever(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint); } -void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { +void pubsub_topologyManager_subscriberAdded(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { pubsub_topology_manager_t *manager = handle; - //subscriber_service_pt subscriber = (subscriber_service_pt)service; + //NOTE new local subscriber service register + //1) First trying to see if a TopicReceiver already exists for this subscriber, if found + //2) update the usage count. if not found + //3) Create new entry, find matching psa and serializer and broadcast cond, so that the psaHandling thread will + // call the psa to setup the topic receiver and announce the endpoint. + + const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); + if (topic == NULL) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, + "[PSTM] Warning found subscriber service without mandatory %s property.", + PUBSUB_SUBSCRIBER_TOPIC); + return; + } - pubsub_endpoint_pt sub = NULL; - if(pubsubEndpoint_createFromSvc(manager->context, bnd, props,false, &sub) == CELIX_SUCCESS) { - celixThreadMutex_lock(&manager->subscriptionsLock); - char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(sub->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); + long bndId = celix_bundle_getId(bnd); + char *scopeAndTopicKey = NULL; + scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); - array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); - if(sub_list_by_topic==NULL){ - arrayList_create(&sub_list_by_topic); - hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic); - } - free(sub_key); - arrayList_add(sub_list_by_topic,sub); - - celixThreadMutex_unlock(&manager->subscriptionsLock); - - unsigned int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - celixThreadMutex_lock(&manager->psaListLock); - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,sub,&score); - if (score > best_score) { /* We have a new winner! */ - best_score = score; - best_psa = psa; + celixThreadMutex_lock(&manager->topicReceivers.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicReceivers.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount += 1; + free(scopeAndTopicKey); + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + + if (entry == NULL) { + //new TopicReceiver needed -> matching for psa/serializer + entry = calloc(1, sizeof(*entry)); + entry->scopeAndTopicKey = scopeAndTopicKey; //note taking owner ship + entry->selectedPsaSvcId = -1L; + entry->selectedSerializerSvcId = -1L; + entry->usageCount = 1; + + double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serializerSvcId = -1L; + long selectedPsasvcId = -1L; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter); + long svcId = (long) hashMapEntry_getKey(mapEntry); + pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); + double score = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serSvcId = -1L; + + psa->matchSubscriber(psa->handle, bndId, props, &score, &serSvcId); + if (score > highestScore) { + highestScore = score; + serializerSvcId = serSvcId; + selectedPsasvcId = svcId; } } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); - if (best_psa != NULL && best_score>0) { - best_psa->addSubscription(best_psa->admin,sub); - } - - // Inform discoveries for interest in the topic - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - pubsub_announce_endpoint_listener_t *disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->announceEndpoint(disc->handle, sub->properties); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - - celixThreadMutex_unlock(&manager->psaListLock); - } -} - - -void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { - pubsub_topology_manager_t *manager = handle; - - pubsub_endpoint_pt subcmp = NULL; - if (pubsubEndpoint_createFromSvc(manager->context, bnd, props, false, &subcmp) == CELIX_SUCCESS){ - - unsigned int j,k; + if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { + entry->selectedPsaSvcId = selectedPsasvcId; + entry->selectedSerializerSvcId = serializerSvcId; - // Inform discoveries that we not interested in the topic any more - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - pubsub_announce_endpoint_listener_t *disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->removeEndpoint(disc->handle, subcmp->properties); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - celixThreadMutex_lock(&manager->psaListLock); - - char *sub_key = pubsubEndpoint_createScopeTopicKey(properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE),properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); - free(sub_key); - if(sub_list_by_topic!=NULL){ - for(j=0;j<arrayList_size(sub_list_by_topic);j++){ - pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j); - if(pubsubEndpoint_equals(sub,subcmp)){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->removeSubscription(psa->admin,sub); - } + celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, + pstm_setupTopicReceiverCallback); - } - arrayList_remove(sub_list_by_topic,j); + if (entry->endpoint != NULL) { + entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL); - /* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */ - if(arrayList_size(sub_list_by_topic)==0){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(subcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - } + //announce new endpoint through the network + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->announceEndpoint(listener->handle, entry->endpoint); } - - pubsubEndpoint_destroy(sub); - + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + + //store topic receiver. + //TODO race condition if multiple scope/topic combinations are request -> broader the lock? + celixThreadMutex_lock(&manager->topicReceivers.mutex); + hashMap_put(manager->topicReceivers.map, entry->scopeAndTopicKey, entry); + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + } else { + free(entry->scopeAndTopicKey); + free(entry); + //ignore -> psa unregistered in meantime } } - - celixThreadMutex_unlock(&manager->psaListLock); - celixThreadMutex_unlock(&manager->subscriptionsLock); - - pubsubEndpoint_destroy(subcmp); - } } -void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props) { - pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle; - pubsub_announce_endpoint_listener_t *disc = svc; +void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc __attribute__((unused)), const celix_properties_t *props, const celix_bundle_t *bnd) { + pubsub_topology_manager_t *manager = handle; + //NOTE local subscriber service unregister + //1) Find topic receiver and decrease count - const char* fwUUID = NULL; + const char *topic = celix_properties_get(props, PUBSUB_SUBSCRIBER_TOPIC, NULL); + const char *scope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, "default"); - bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - if(fwUUID==NULL){ - printf("PSD: ERRROR: Cannot retrieve fwUUID.\n"); + if (topic == NULL) { return; } - celixThreadMutex_lock(&manager->publicationsLock); + char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicReceivers.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_remove(manager->topicReceivers.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount -= 0; + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + free(scopeAndTopicKey); - celixThreadMutex_lock(&manager->discoveryListLock); - long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); - hashMap_put(manager->discoveryList, (void*)svcId, NULL); - celixThreadMutex_unlock(&manager->discoveryListLock); - - hash_map_iterator_pt iter = hashMapIterator_create(manager->publications); - while(hashMapIterator_hasNext(iter)){ - array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); - for(unsigned int i = 0; i < arrayList_size(pubEP_list); i++) { - pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if( (strcmp(properties_get(pubEP->properties, PUBSUB_ENDPOINT_FRAMEWORK_UUID),fwUUID)==0)) { - disc->announceEndpoint(disc->handle,pubEP->properties); - } + //NOTE not waking up psaHandling thread, topic receiver does not need to be removed immediately. +} + +void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle; + pubsub_announce_endpoint_listener_t *listener = svc; + + //1) retroactively call announceEndpoint for already existing endpoints (manager->announcedEndpoints) + //2) Add listener to manager->announceEndpointListeners + + celixThreadMutex_lock(&manager->announcedEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter); + for (int i = 0; i < celix_arrayList_size(endpoints); ++i) { + celix_properties_t *ep = celix_arrayList_get(endpoints, i); + listener->announceEndpoint(listener->handle, ep); } } - hashMapIterator_destroy(iter); - - celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_unlock(&manager->announcedEndpoints.mutex); - celixThreadMutex_lock(&manager->subscriptionsLock); - iter = hashMapIterator_create(manager->subscriptions); + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + celix_arrayList_add(manager->announceEndpointListeners.list, listener); + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); +} - while(hashMapIterator_hasNext(iter)) { - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); - unsigned int i; - for(i=0;i<arrayList_size(l);i++){ - pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); - disc->announceEndpoint(disc->handle, subEp->properties); +void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props __attribute__((unused))) { + pubsub_topology_manager_t *manager = (pubsub_topology_manager_t *)handle; + pubsub_announce_endpoint_listener_t *listener = svc; + + //1) Remove listener from manager->announceEndpointListeners + //2) call removeEndpoint for already existing endpoints (manager->announcedEndpoints) + + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + celix_arrayList_remove(manager->announceEndpointListeners.list, listener); + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + + celixThreadMutex_lock(&manager->announcedEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->announcedEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + celix_array_list_t *endpoints = hashMapIterator_nextValue(&iter); + for (int i = 0; i < celix_arrayList_size(endpoints); ++i) { + celix_properties_t *ep = celix_arrayList_get(endpoints, i); + listener->removeEndpoint(listener->handle, ep); } } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->subscriptionsLock); + celixThreadMutex_unlock(&manager->announcedEndpoints.mutex); } -void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props) { - pubsub_topology_manager_t *manager = handle; +static void pstm_setupTopicSenderCallback(void *handle, void *svc) { + pstm_topic_receiver_or_sender_entry_t *entry = handle; + pubsub_admin_service_t *psa = svc; + psa->setupTopicSender(psa->handle, entry->scope, entry->topic, entry->selectedSerializerSvcId, &entry->endpoint); +} - celixThreadMutex_lock(&manager->discoveryListLock); +void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) { + pubsub_topology_manager_t *manager = handle; + //NOTE new local subscriber service register + //1) First trying to see if a TopicReceiver already exists for this subscriber, if found + //2) update the usage count. if not found + //3) Try to find a matching psa and create a new TopicReceiver. - long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1L); - if (hashMap_remove(manager->discoveryList, (void*)svcId)) { - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed"); + //TODO FIXME + if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName); + return; } - celixThreadMutex_unlock(&manager->discoveryListLock); -} - -static void tm_callAnnounce(void *handle, void *svc) { - pubsub_endpoint_t *pub = handle; - pubsub_announce_endpoint_listener_t *listener = svc; - listener->announceEndpoint(listener->handle, pub->properties); -} - -void pubsub_topologyManager_publisherTrackerAdded(void *handle, const celix_service_tracker_info_t *info) { - pubsub_topology_manager_t *manager = handle; - pubsub_endpoint_pt pub = NULL; - celix_status_t status = pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pub); - if (status == CELIX_SUCCESS) { + char *topic = NULL; + char *scopeFromFilter = NULL; + pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter); + const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; - celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); - if(pub_list_by_topic==NULL){ - arrayList_create(&pub_list_by_topic); - hashMap_put(manager->publications,pub_key,pub_list_by_topic); - } else { - free(pub_key); - } - arrayList_add(pub_list_by_topic,pub); - - celixThreadMutex_unlock(&manager->publicationsLock); - - unsigned int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - celixThreadMutex_lock(&manager->psaListLock); - - int size = celix_arrayList_size(manager->psaList); - for (j=0; j<size; j++) { - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin,pub,&score); - if(score>best_score){ /* We have a new winner! */ - best_score = score; - best_psa = psa; - } + char *scopeAndTopicKey = NULL; + if (topic == NULL) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, + "[PSTM] Warning found publisher service request without mandatory '%s' filter attribute.", PUBSUB_SUBSCRIBER_TOPIC); + } else { + scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicSenders.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount += 1; } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + + if (entry == NULL) { + //new topic receiver needed, requesting match with current psa + double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serializerSvcId = -1L; + long selectedPsasvcId = -1L; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *entry = hashMapIterator_nextEntry(&iter); + long svcId = (long)hashMapEntry_getKey(entry); + pubsub_admin_service_t *psa = hashMapEntry_getValue(entry); + double score = PUBSUB_ADMIN_NO_MATCH_SCORE; + long serSvcId = -1L; + psa->matchPublisher(psa->handle, info->bundleId, info->filter, &score, &serSvcId); + if (score > highestScore) { + highestScore = score; + serializerSvcId = serSvcId; + selectedPsasvcId = svcId; + } + } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + + if (highestScore > PUBSUB_ADMIN_NO_MATCH_SCORE) { + entry = calloc(1, sizeof(*entry)); + entry->scopeAndTopicKey = scopeAndTopicKey; + entry->usageCount = 1; + entry->selectedPsaSvcId = selectedPsasvcId; + entry->selectedSerializerSvcId = serializerSvcId; + entry->topic = topic; //NOTE tmp + entry->scope = scope; //NOTE tmp + + celix_bundleContext_useServiceWithId(manager->context, selectedPsasvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_setupTopicSenderCallback); + + if (entry->endpoint != NULL) { + //note psa->setupTopicSender has created the endpoint. + entry->scope = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); + entry->topic = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); + entry->endpointUUID = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL); + } else { + free(entry->scopeAndTopicKey); + free(entry); + entry = NULL; + //ignore -> psa unregistered in meantime + } - if (best_psa != NULL && best_score > 0) { - celix_status_t status = best_psa->addPublication(best_psa->admin,pub); - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveryList); - while(hashMapIterator_hasNext(&iter)) { - long svcId = (long)hashMapIterator_nextKey(&iter); - celix_bundleContext_useServiceWithId(manager->context, svcId, PUBSUB_ANNOUNCE_ENDPOINT_LISTENER_SERVICE, pub, tm_callAnnounce); + if (entry != NULL) { + //announce new endpoint through the network + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->announceEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + + //store topic sender. + celixThreadMutex_lock(&manager->topicSenders.mutex); + hashMap_put(manager->topicSenders.map, entry->scopeAndTopicKey, entry); + celixThreadMutex_unlock(&manager->topicSenders.mutex); + + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "[PSTM] setting up new TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n", + entry->scope, entry->topic, adminType, serType); } - celixThreadMutex_unlock(&manager->discoveryListLock); } + } else { + free(scopeAndTopicKey); } - - celixThreadMutex_unlock(&manager->psaListLock); - } + free(scopeFromFilter); } - void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_service_tracker_info_t *info) { pubsub_topology_manager_t *manager = handle; - pubsub_endpoint_pt pubcmp = NULL; - if(pubsubEndpoint_createFromListenerHookInfo(manager->context, info, true, &pubcmp) == CELIX_SUCCESS){ - unsigned int j,k; - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - - char *pub_key = pubsubEndpoint_createScopeTopicKey(properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), properties_get(pubcmp->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic!=NULL){ - for(j=0;j<arrayList_size(pub_list_by_topic);j++){ - pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j); - if(pubsubEndpoint_equals(pub,pubcmp)){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - celix_status_t status = psa->removePublication(psa->admin,pub); - if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - pubsub_announce_endpoint_listener_t *disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->removeEndpoint(disc->handle,pub->properties); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } - else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */ - status = CELIX_SUCCESS; - } - } - //} - arrayList_remove(pub_list_by_topic,j); - - /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */ - if(arrayList_size(pub_list_by_topic)==0){ - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllPublications(psa->admin, (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE), (char*) properties_get(pub->properties, PUBSUB_ENDPOINT_TOPIC_NAME)); - } - } - - pubsubEndpoint_destroy(pub); - } + //NOTE local subscriber service unregister + //1) Find topic sender and decrease count - } - } + //TODO FIXME + if (strcmp(info->serviceName, PUBSUB_PUBLISHER_SERVICE_NAME) != 0) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_WARNING, "Bug. trackServiceTracker should only trigger for %s. Now triggering on %s", PUBSUB_PUBLISHER_SERVICE_NAME, info->serviceName); + return; + } - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); + char *topic = NULL; + char *scopeFromFilter = NULL; + pubsub_getPubSubInfoFromFilter(info->filter->filterStr, &topic, &scopeFromFilter); + const char *scope = scopeFromFilter == NULL ? "default" : scopeFromFilter; - free(pub_key); + if (topic == NULL) { + free(scopeFromFilter); + return; + } - pubsubEndpoint_destroy(pubcmp); + char *scopeAndTopicKey = pubsubEndpoint_createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&manager->topicSenders.mutex); + pstm_topic_receiver_or_sender_entry_t *entry = hashMap_get(manager->topicSenders.map, scopeAndTopicKey); + if (entry != NULL) { + entry->usageCount -= 1; } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + + free(scopeAndTopicKey); + free(scopeFromFilter); +} + +static void pstm_addEndpointCallback(void *handle, void *svc) { + celix_properties_t *endpoint = handle; + pubsub_admin_service_t *psa = svc; + psa->addEndpoint(psa->handle, endpoint); } -static celix_status_t pubsub_topologyManager_addDiscoveredPublisher(void *handle, const celix_properties_t *pubProperties){ +celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint){ celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_t *manager = handle; - const char *topic = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL); - const char *scope = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL); - const char *fwUid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL); - const char *uuid = celix_properties_get(pubProperties, PUBSUB_ENDPOINT_UUID, NULL); + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); + assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid. + // 1) See if endpoint is already discovered, if so increase usage count. + // 1) If not, find matching psa using the matchEndpoint + // 2) if found call addEndpoint of the matching psa - if (manager->verbose) { - printf("PSTM: New publisher discovered for scope/topic %s/%s [fwUUID=%s, epUUID=%s]\n", - scope, topic, fwUid, uuid); - } - - - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); + if (manager->verbose) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "PSTM: Discovered endpoint added for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), + uuid); + } - char *pub_key = pubsubEndpoint_createScopeTopicKey(scope, topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic==NULL){ - arrayList_create(&pub_list_by_topic); - hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); + if (entry != NULL) { + //already existing endpoint -> increase usage + entry->usageCount += 1; } - free(pub_key); - - /* Shouldn't be any other duplicate, since it's filtered out by the discovery */ - pubsub_endpoint_pt p = NULL; - pubsubEndpoint_createFromProperties(pubProperties, &p); - arrayList_add(pub_list_by_topic , p); - - unsigned int j; - double score = 0; - double best_score = 0; - pubsub_admin_service_pt best_psa = NULL; - - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - psa->matchEndpoint(psa->admin , p, &score); - if (score>best_score) { /* We have a new winner! */ - best_score = score; - best_psa = psa; + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + + if (entry == NULL) { + + //new endpoint -> new entry + entry = calloc(1, sizeof(*entry)); + entry->usageCount = 1; + entry->endpoint = celix_properties_copy(endpoint); + entry->uuid = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_UUID, NULL); + entry->selectedPsaSvcId = -1L; + + double highestScore = PUBSUB_ADMIN_NO_MATCH_SCORE; + long psaSvcId = -1L; + + celixThreadMutex_lock(&manager->pubsubadmins.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->pubsubadmins.map); + while (hashMapIterator_hasNext(&iter)) { + hash_map_entry_t *mapEntry = hashMapIterator_nextEntry(&iter); + pubsub_admin_service_t *psa = hashMapEntry_getValue(mapEntry); + long svcId = (long) hashMapEntry_getKey(mapEntry); + double score = PUBSUB_ADMIN_NO_MATCH_SCORE; + psa->matchEndpoint(psa->handle, endpoint, &score); + if (score > highestScore) { + highestScore = score; + psaSvcId = svcId; + } + } + celixThreadMutex_unlock(&manager->pubsubadmins.mutex); + + if (psaSvcId >= 0) { + //psa called outside of mutex, this means the it can happen that addEndpointCallback is not called. + //for now this is expected behaviour; + //You need to start the pubsub admin stuff before the bundles using pubsub. + celix_bundleContext_useServiceWithId(manager->context, psaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + (void *) endpoint, pstm_addEndpointCallback); + } else { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "Cannot find psa for endpoint %s\n", entry->uuid); } - } - if(best_psa != NULL && best_score>0) { - //TODO FIXME this the same call as used by publisher of service trackers. This is confusing. - //remote discovered publication can be handle different. - best_psa->addPublication(best_psa->admin,p); - } - else{ - status = CELIX_ILLEGAL_STATE; + entry->selectedPsaSvcId = psaSvcId; + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + hashMap_put(manager->discoveredEndpoints.map, (void*)entry->uuid, entry); + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); } - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); + return status; +} - return status; +static void pstm_removeEndpointCallback(void *handle, void *svc) { + celix_properties_t *endpoint = handle; + pubsub_admin_service_t *psa = svc; + psa->removeEndpoint(psa->handle, endpoint); } -static celix_status_t pubsub_topologyManager_removeDiscoveredPublisher(void *handle, const celix_properties_t *props) { +celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint) { pubsub_topology_manager_t *manager = handle; - if (manager->verbose) { - printf("PSTM: Publisher removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", - celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), - celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), - celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), - celix_properties_get(props, PUBSUB_ENDPOINT_UUID, NULL)); - } + // 1) See if endpoint is already discovered, if so decrease usage count. + // 1) If usage count becomes 0, find matching psa using the matchEndpoint + // 2) if found call disconnectEndpoint of the matching psa - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - unsigned int i; + const char *uuid = celix_properties_get(endpoint, PUBSUB_ENDPOINT_UUID, NULL); + assert(uuid != NULL); //discovery should check if endpoint is valid -> pubsubEndoint_isValid. - char *pub_key = pubsubEndpoint_createScopeTopicKey(celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), celix_properties_get(props, PUBSUB_ENDPOINT_TOPIC_NAME, NULL)); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic==NULL){ - printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,celix_properties_get(props, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL),celix_properties_get(props, "pubsub.url", NULL)); + if (manager->verbose) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "PSTM: Discovered endpoint removed for topic %s with scope %s [fwUUID=%s, epUUID=%s]\n", + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), + celix_properties_get(endpoint, PUBSUB_ENDPOINT_FRAMEWORK_UUID, NULL), + uuid); } - else{ - - pubsub_endpoint_pt p = NULL; - bool found = false; - for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){ - p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i); - found = pubsubEndpoint_equalsWithProperties(p,props); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + pstm_discovered_endpoint_entry_t *entry = hashMap_get(manager->discoveredEndpoints.map, uuid); + if (entry != NULL) { + //already existing endpoint -> decrease usage + entry->usageCount-= 1; + if (entry->usageCount <= 0) { + hashMap_remove(manager->discoveredEndpoints.map, entry->uuid); + } else { + entry = NULL; //still used (usage count > 0) -> do nothing + } + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + + if (entry != NULL) { + //note entry is removed from manager->discoveredEndpoints, also inform used psa + if (entry->selectedPsaSvcId >= 0) { + //note that it is possible that the psa is already gone, in that case the call is also not needed anymore. + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + (void *) endpoint, pstm_removeEndpointCallback); + } else { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, "No selected psa for endpoint %s\n", entry->uuid); } + celix_properties_destroy(entry->endpoint); + free(entry); + } - if(found && p !=NULL){ - for(i=0;i<arrayList_size(manager->psaList);i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ - psa->removePublication(psa->admin,p); - } + return CELIX_SUCCESS; +} - arrayList_removeElement(pub_list_by_topic,p); - /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */ - if(arrayList_size(pub_list_by_topic)==0){ +static void pstm_teardownTopicSenderCallback(void *handle, void *svc) { + pstm_topic_receiver_or_sender_entry_t *entry = handle; + pubsub_admin_service_t *psa = svc; + psa->teardownTopicSender(psa->handle, entry->scope, entry->topic); +} - for(i=0;i<arrayList_size(manager->psaList);i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - psa->closeAllPublications(psa->admin, (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL), (char*) celix_properties_get(p->properties, PUBSUB_ENDPOINT_TOPIC_NAME, NULL)); - } - } +static void pstm_teardownTopicSenders(pubsub_topology_manager_t *manager) { + celixThreadMutex_lock(&manager->topicSenders.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + + if (entry != NULL && entry->usageCount <= 0) { + hashMapIterator_remove(&iter); + if (manager->verbose) { + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "[PSTM] Tearing down TopicSender for scope/topic %s/%s with psa admin type %s and serializer %s\n", + entry->scope, entry->topic, adminType, serType); + } + + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener; + listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->removeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, + entry, pstm_teardownTopicSenderCallback); + + + //cleanup entry + free(entry->scopeAndTopicKey); + celix_properties_destroy(entry->endpoint); + free(entry); + } + } + celixThreadMutex_unlock(&manager->topicSenders.mutex); +} - pubsubEndpoint_destroy(p); - } +static void pstm_teardownTopicReceiverCallback(void *handle, void *svc) { + pstm_topic_receiver_or_sender_entry_t *entry = handle; + pubsub_admin_service_t *psa = svc; + psa->teardownTopicSender(psa->handle, entry->scope, entry->topic); +} + +static void pstm_teardownTopicReceivers(pubsub_topology_manager_t *manager) { + celixThreadMutex_lock(&manager->topicReceivers.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + if (entry != NULL && entry->usageCount <= 0) { + hashMapIterator_remove(&iter); + + if (manager->verbose) { + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_DEBUG, + "[PSTM] Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n", + entry->scope, entry->topic, adminType, serType); + } + + celix_bundleContext_useServiceWithId(manager->context, entry->selectedPsaSvcId, PUBSUB_ADMIN_SERVICE_NAME, entry, pstm_teardownTopicReceiverCallback); + celixThreadMutex_lock(&manager->announceEndpointListeners.mutex); + for (int i = 0; i < celix_arrayList_size(manager->announceEndpointListeners.list); ++i) { + pubsub_announce_endpoint_listener_t *listener = celix_arrayList_get(manager->announceEndpointListeners.list, i); + listener->removeEndpoint(listener->handle, entry->endpoint); + } + celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex); + + //cleanup entry + free(entry->scopeAndTopicKey); + celix_properties_destroy(entry->endpoint); + free(entry); + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); +} +static void* pstm_psaHandlingThread(void *data) { + pubsub_topology_manager_t *manager = data; - } - free(pub_key); - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); + celixThreadMutex_lock(&manager->psaHandling.mutex); + bool running = manager->psaHandling.running; + celixThreadMutex_unlock(&manager->psaHandling.mutex); + while (running) { + pstm_teardownTopicSenders(manager); + pstm_teardownTopicReceivers(manager); - return CELIX_SUCCESS; + celixThreadMutex_lock(&manager->psaHandling.mutex); + celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, PSTM_CLEANUP_SLEEPTIME_IN_SECONDS, 0L); + running = manager->psaHandling.running; + celixThreadMutex_unlock(&manager->psaHandling.mutex); + } + return NULL; } -celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties) { - const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL); - if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - return pubsub_topologyManager_addDiscoveredPublisher(handle, properties); - } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) { - //nop //TODO add subscription to pubsub admins - } else { - fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE); - } - return CELIX_SUCCESS; -} -celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties) { - const char *type = celix_properties_get(properties, PUBSUB_ENDPOINT_TYPE, NULL); - if (type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0) { - pubsub_topologyManager_removeDiscoveredPublisher(handle, properties); - } else if (type != NULL && strncmp(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, type, strlen(PUBSUB_SUBSCRIBER_ENDPOINT_TYPE)) == 0) { - //nop //TODO remove subscription from pubsub admins - } else { - fprintf(stderr, "Invalid endpoint. Endpoint has no endpoint type (key: %s)\n", PUBSUB_ENDPOINT_TYPE); +celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine __attribute__((unused)), FILE *os, FILE *errorStream __attribute__((unused))) { + pubsub_topology_manager_t *manager = handle; + //TODO add support for searching based on scope and topic + + fprintf(os, "\n"); + + fprintf(os, "Discovered Endpoints: \n"); + celixThreadMutex_lock(&manager->discoveredEndpoints.mutex); + hash_map_iterator_t iter = hashMapIterator_construct(manager->discoveredEndpoints.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_discovered_endpoint_entry_t *discovered = hashMapIterator_nextValue(&iter); + const char *scope = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, "!Error!"); + const char *topic = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, "!Error!"); + const char *adminType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(discovered->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Discovered Endpoint %s:\n", discovered->uuid); + fprintf(os, " |- scope = %s\n", scope); + fprintf(os, " |- topic = %s\n", topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", discovered->selectedPsaSvcId); + fprintf(os, " |- usage count = %i\n", discovered->usageCount); + } + } + celixThreadMutex_unlock(&manager->discoveredEndpoints.mutex); + fprintf(os,"\n"); + + + fprintf(os, "Active Topic Senders:\n"); + celixThreadMutex_lock(&manager->topicSenders.mutex); + iter = hashMapIterator_construct(manager->topicSenders.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Topic Sender for endpoint %s:\n", entry->endpointUUID); + fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- topic = %s\n", entry->topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); + fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } } + celixThreadMutex_unlock(&manager->topicSenders.mutex); + fprintf(os,"\n"); + + fprintf(os, "Active Topic Receivers:\n"); + celixThreadMutex_lock(&manager->topicReceivers.mutex); + iter = hashMapIterator_construct(manager->topicReceivers.map); + while (hashMapIterator_hasNext(&iter)) { + pstm_topic_receiver_or_sender_entry_t *entry = hashMapIterator_nextValue(&iter); + const char *adminType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_ADMIN_TYPE, "!Error!"); + const char *serType = celix_properties_get(entry->endpoint, PUBSUB_ENDPOINT_SERIALIZER, "!Error!"); + fprintf(os, "|- Topic Receiver for endpoint %s:\n", entry->endpointUUID); + fprintf(os, " |- scope = %s\n", entry->scope); + fprintf(os, " |- topic = %s\n", entry->topic); + fprintf(os, " |- admin type = %s\n", adminType); + fprintf(os, " |- serializer = %s\n", serType); + if (manager->verbose) { + fprintf(os, " |- psa svc id = %li\n", entry->selectedPsaSvcId); + fprintf(os, " |- ser svc id = %li\n", entry->selectedSerializerSvcId); + fprintf(os, " |- usage count = %i\n", entry->usageCount); + } + } + celixThreadMutex_unlock(&manager->topicReceivers.mutex); + fprintf(os,"\n"); + return CELIX_SUCCESS; } - - http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h index dda84a0..105b797 100644 --- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h +++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h @@ -36,31 +36,68 @@ #define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false -struct pubsub_topology_manager { +typedef struct pubsub_topology_manager { bundle_context_pt context; - celix_thread_mutex_t psaListLock; - array_list_pt psaList; - - celix_thread_mutex_t discoveryListLock; - hash_map_pt discoveryList; //<svcId,NULL> - - celix_thread_mutex_t publicationsLock; - hash_map_pt publications; //<topic(string),list<pubsub_ep>> - - celix_thread_mutex_t subscriptionsLock; - hash_map_pt subscriptions; //<topic(string),list<pubsub_ep>> - - command_service_t shellCmdService; - service_registration_pt shellCmdReg; - + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = svcId, value = pubsub_admin_t* + } pubsubadmins; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = psa svc id, value = list<celix_properties_t /*endpoint*/> + } announcedEndpoints; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = uuid , value = pstm_discovered_endpoint_entry_t + } discoveredEndpoints; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t* + } topicReceivers; + + struct { + celix_thread_mutex_t mutex; + hash_map_t *map; //key = scope/topic key, value = pstm_topic_receiver_or_sender_entry_t* + } topicSenders; + + struct { + celix_thread_mutex_t mutex; + celix_array_list_t *list; //<pubsub_announce_endpoint_listener_t*> + } announceEndpointListeners; + + struct { + celix_thread_t thread; + celix_thread_mutex_t mutex; //protect running and condition + celix_thread_cond_t cond; + bool running; + } psaHandling; log_helper_pt loghelper; bool verbose; -}; - -typedef struct pubsub_topology_manager pubsub_topology_manager_t; +} pubsub_topology_manager_t; + +typedef struct pstm_discovered_endpoint_entry { + const char *uuid; + long selectedPsaSvcId; + int usageCount; //note that discovered endpoints can be found multiple times by different pubsub discovery components + celix_properties_t *endpoint; +} pstm_discovered_endpoint_entry_t; + +typedef struct pstm_topic_receiver_or_sender_entry { + char *scopeAndTopicKey; //key of the combined value of the scope and topic + celix_properties_t *endpoint; + const char *topic; + const char *scope; + const char *endpointUUID; + int usageCount; //nr of subscriber service for the topic receiver (matching scope & topic) + long selectedPsaSvcId; + long selectedSerializerSvcId; +} pstm_topic_receiver_or_sender_entry_t; celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_t **manager); celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_t *manager); @@ -69,8 +106,8 @@ celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_t *ma void pubsub_topologyManager_psaAdded(void *handle, void *svc, const celix_properties_t *props); void pubsub_topologyManager_psaRemoved(void *handle, void *svc, const celix_properties_t *props); -void pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, void *svc, const celix_properties_t *props); -void pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, void *svc, const celix_properties_t *props); +void pubsub_topologyManager_pubsubAnnounceEndpointListenerAdded(void* handle, void *svc, const celix_properties_t *props); +void pubsub_topologyManager_pubsubAnnounceEndpointListenerRemoved(void * handle, void *svc, const celix_properties_t *props); void pubsub_topologyManager_subscriberAdded(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); void pubsub_topologyManager_subscriberRemoved(void * handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); @@ -81,4 +118,6 @@ void pubsub_topologyManager_publisherTrackerRemoved(void *handle, const celix_se celix_status_t pubsub_topologyManager_addDiscoveredEndpoint(void *handle, const celix_properties_t *properties); celix_status_t pubsub_topologyManager_removeDiscoveredEndpoint(void *handle, const celix_properties_t *properties); +celix_status_t pubsub_topologyManager_shellCommand(void *handle, char * commandLine, FILE *outStream, FILE *errorStream); + #endif /* PUBSUB_TOPOLOGY_MANAGER_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_api.h ---------------------------------------------------------------------- diff --git a/libs/framework/include/celix_api.h b/libs/framework/include/celix_api.h index 5129a65..b941a78 100644 --- a/libs/framework/include/celix_api.h +++ b/libs/framework/include/celix_api.h @@ -21,32 +21,26 @@ #define CELIX_CELIX_API_H_ #include "properties.h" - #include "array_list.h" -#include "celix_array_list.h" - #include "constants.h" +#include "bundle.h" +#include "bundle_context.h" +#include "framework.h" +#include "celix_properties.h" +#include "celix_array_list.h" +//#include "celix_constants.h" #include "celix_utils_api.h" - -#include "bundle.h" #include "celix_bundle.h" - -#include "bundle_context.h" #include "celix_bundle_context.h" -#include "service_registration.h" -#include "service_factory.h" -#include "service_reference.h" -#include "service_tracker.h" -#include "service_tracker_customizer.h" -#include "listener_hook_service.h" - -#include "framework.h" +#include "celix_framework.h" #include "celix_framework_factory.h" #include "celix_launcher.h" #include "dm_dependency_manager.h" #include "dm_service_dependency.h" +#include "celix_bundle_activator.h" + #endif //CELIX_CELIX_API_H_ http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/include/celix_bundle_context.h ---------------------------------------------------------------------- diff --git a/libs/framework/include/celix_bundle_context.h b/libs/framework/include/celix_bundle_context.h index 5bb6faa..76af9a5 100644 --- a/libs/framework/include/celix_bundle_context.h +++ b/libs/framework/include/celix_bundle_context.h @@ -357,6 +357,7 @@ typedef struct celix_service_tracker_options { .filter.versionRange = NULL, \ .filter.filter = NULL, \ .filter.serviceLanguage = NULL, \ + .filter.ignoreServiceLanguage = false, \ .callbackHandle = NULL, \ .set = NULL, \ .add = NULL, \ @@ -797,6 +798,16 @@ const char* celix_bundleContext_getProperty(celix_bundle_context_t *ctx, const c long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const char *key, long defaultValue); /** + * Gets the config property as converts it to double. If the property is not a valid double, the defaultValue will be returned. + * The rest of the behaviour is the same as celix_bundleContext_getProperty. + + * @param key The key of the property to receive. + * @param defaultVal The default value to use if the property is not found. + * @return The property value for the provided key or the provided defaultValue is the key is not found. + */ +double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue); + +/** * Gets the config property as converts it to bool. If the property is not a valid bool, the defaultValue will be returned. * The rest of the behaviour is the same as celix_bundleContext_getProperty. http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/bundle_context.c ---------------------------------------------------------------------- diff --git a/libs/framework/src/bundle_context.c b/libs/framework/src/bundle_context.c index b8b1552..1e86f2a 100644 --- a/libs/framework/src/bundle_context.c +++ b/libs/framework/src/bundle_context.c @@ -1038,6 +1038,20 @@ long celix_bundleContext_getPropertyAsLong(celix_bundle_context_t *ctx, const ch return result; } +double celix_bundleContext_getPropertyAsDouble(celix_bundle_context_t *ctx, const char *key, double defaultValue) { + double result = defaultValue; + const char *val = celix_bundleContext_getProperty(ctx, key, NULL); + if (val != NULL) { + char *enptr = NULL; + errno = 0; + double r = strtod(val, &enptr); + if (enptr != val && errno == 0) { + result = r; + } + } + return result; +} + bool celix_bundleContext_getPropertyAsBool(celix_bundle_context_t *ctx, const char *key, bool defaultValue) { bool result = defaultValue; http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/framework/src/framework.c ---------------------------------------------------------------------- diff --git a/libs/framework/src/framework.c b/libs/framework/src/framework.c index 4059e70..b0d505d 100644 --- a/libs/framework/src/framework.c +++ b/libs/framework/src/framework.c @@ -2822,7 +2822,7 @@ void celix_framework_useBundle(framework_t *fw, bool onlyActive, long bundleId, bundle_t *bnd = framework_getBundleById(fw, bundleId); if (bnd != NULL) { celix_bundle_state_e bndState = celix_bundle_getState(bnd); - if (onlyActive && bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE) { + if (onlyActive && (bndState == OSGI_FRAMEWORK_BUNDLE_ACTIVE || bndState == OSGI_FRAMEWORK_BUNDLE_STARTING)) { use(callbackHandle, bnd); } else if (!onlyActive) { use(callbackHandle, bnd); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_properties.h ---------------------------------------------------------------------- diff --git a/libs/utils/include/celix_properties.h b/libs/utils/include/celix_properties.h index e325222..f6e871b 100644 --- a/libs/utils/include/celix_properties.h +++ b/libs/utils/include/celix_properties.h @@ -60,12 +60,16 @@ void celix_properties_unset(celix_properties_t *properties, const char *key); celix_properties_t* celix_properties_copy(const celix_properties_t *properties); long celix_properties_getAsLong(const celix_properties_t *props, const char *key, long defaultValue); - void celix_properties_setLong(celix_properties_t *props, const char *key, long value); bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue); void celix_properties_setBool(celix_properties_t *props, const char *key, bool val); + +void celix_properties_setDouble(celix_properties_t *props, const char *key, double val); +double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue); + + #define CELIX_PROPERTIES_FOR_EACH(props, key) \ for(hash_map_iterator_t iter = hashMapIterator_construct(props); \ hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);) http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/include/celix_threads.h ---------------------------------------------------------------------- diff --git a/libs/utils/include/celix_threads.h b/libs/utils/include/celix_threads.h index a9a3049..bf6e1f6 100644 --- a/libs/utils/include/celix_threads.h +++ b/libs/utils/include/celix_threads.h @@ -54,6 +54,11 @@ static const celix_thread_t celix_thread_default = {0, 0}; celix_status_t celixThread_create(celix_thread_t *new_thread, celix_thread_attr_t *attr, celix_thread_start_t func, void *data); +/** + * If supported by the platform sets the name of the thread. + */ +void celixThread_setName(celix_thread_t *thread, const char *threadName); + void celixThread_exit(void *exitStatus); celix_status_t celixThread_detach(celix_thread_t thread); @@ -123,7 +128,7 @@ celix_status_t celixThreadCondition_destroy(celix_thread_cond_t *condition); celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex); -celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds); +celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds); celix_status_t celixThreadCondition_broadcast(celix_thread_cond_t *cond); http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/celix_threads.c ---------------------------------------------------------------------- diff --git a/libs/utils/src/celix_threads.c b/libs/utils/src/celix_threads.c index d8a8091..7bfc37d 100644 --- a/libs/utils/src/celix_threads.c +++ b/libs/utils/src/celix_threads.c @@ -24,6 +24,7 @@ * \copyright Apache License, Version 2.0 */ #include <stdlib.h> +#include <sys/time.h> #include "signal.h" #include "celix_threads.h" @@ -41,6 +42,16 @@ celix_status_t celixThread_create(celix_thread_t *new_thread, celix_thread_attr_ return status; } +#ifdef _GNU_SOURCE +void celixThread_setName(celix_thread_t *thread, const char *threadName) { + pthread_setname_np(thread->thread, threadName); +} +#else +void celixThread_setName(celix_thread_t *thread __attribute__((unused)), const char *threadName __attribute__((unused))); { + //nop +} +#endif + // Returns void, since pthread_exit does exit the thread and never returns. void celixThread_exit(void *exitStatus) { pthread_exit(exitStatus); @@ -143,10 +154,11 @@ celix_status_t celixThreadCondition_wait(celix_thread_cond_t *cond, celix_thread return pthread_cond_wait(cond, mutex); } -celix_status_t celixThreadCondition_timedwait(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) { +celix_status_t celixThreadCondition_timedwaitRelative(celix_thread_cond_t *cond, celix_thread_mutex_t *mutex, long seconds, long nanoseconds) { struct timespec time; - time.tv_sec = seconds; - time.tv_nsec = nanoseconds; + clock_gettime(CLOCK_REALTIME, &time); + time.tv_sec += seconds; + time.tv_nsec += nanoseconds; return pthread_cond_timedwait(cond, mutex, &time); } http://git-wip-us.apache.org/repos/asf/celix/blob/69596cfd/libs/utils/src/properties.c ---------------------------------------------------------------------- diff --git a/libs/utils/src/properties.c b/libs/utils/src/properties.c index 480a056..191dfd8 100644 --- a/libs/utils/src/properties.c +++ b/libs/utils/src/properties.c @@ -394,13 +394,37 @@ long celix_properties_getAsLong(const celix_properties_t *props, const char *key } void celix_properties_setLong(celix_properties_t *props, const char *key, long value) { - char buf[32]; //should be enough to store long long int - int writen = snprintf(buf, 32, "%li", value); - if (writen <= 31) { - celix_properties_set(props, key, buf); - } else { - fprintf(stderr,"buf to small for value '%li'\n", value); + char buf[32]; //should be enough to store long long int + int writen = snprintf(buf, 32, "%li", value); + if (writen <= 31) { + celix_properties_set(props, key, buf); + } else { + fprintf(stderr,"buf to small for value '%li'\n", value); + } +} + +double celix_properties_getAsDouble(const celix_properties_t *props, const char *key, double defaultValue) { + double result = defaultValue; + const char *val = celix_properties_get(props, key, NULL); + if (val != NULL) { + char *enptr = NULL; + errno = 0; + double r = strtod(val, &enptr); + if (enptr != val && errno == 0) { + result = r; + } } + return result; +} + +void celix_properties_setDouble(celix_properties_t *props, const char *key, double val) { + char buf[32]; //should be enough to store long long int + int writen = snprintf(buf, 32, "%f", val); + if (writen <= 31) { + celix_properties_set(props, key, buf); + } else { + fprintf(stderr,"buf to small for value '%f'\n", val); + } } bool celix_properties_getAsBool(celix_properties_t *props, const char *key, bool defaultValue) {