Repository: celix Updated Branches: refs/heads/develop 83018f069 -> 42545f1a9
http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/ps_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/private/src/ps_activator.c b/pubsub/pubsub_serializer_json/private/src/ps_activator.c index e0e23d4..fec5892 100644 --- a/pubsub/pubsub_serializer_json/private/src/ps_activator.c +++ b/pubsub/pubsub_serializer_json/private/src/ps_activator.c @@ -68,7 +68,11 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) pubsubSerializerSvc->destroySerializerMap = (void*)pubsubSerializer_destroySerializerMap; activator->serializerService = pubsubSerializerSvc; - status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, NULL, &activator->registration); + /* Set serializer type */ + properties_pt props = properties_create(); + properties_set(props,PUBSUB_SERIALIZER_TYPE_KEY,PUBSUB_SERIALIZER_TYPE); + + status = bundleContext_registerService(context, PUBSUB_SERIALIZER_SERVICE, pubsubSerializerSvc, props, &activator->registration); } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c index fb46f10..cffc816 100644 --- a/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c +++ b/pubsub/pubsub_serializer_json/private/src/pubsub_serializer_impl.c @@ -79,206 +79,217 @@ celix_status_t pubsubSerializer_destroy(pubsub_serializer_t* serializer) { return status; } -celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, pubsub_msg_serializer_map_t** out) { - celix_status_t status = CELIX_SUCCESS; - pubsub_msg_serializer_map_t* map = calloc(1, sizeof(*map)); - if (map != NULL) { - map->bundle = bundle; - map->serializers = hashMap_create(NULL, NULL, NULL, NULL); - pubsubSerializer_fillMsgSerializerMap(map->serializers, bundle); - } else { - logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map"); - status = CELIX_ENOMEM; - } - - if (status == CELIX_SUCCESS) { - *out = map; - } - return status; +celix_status_t pubsubSerializer_createSerializerMap(pubsub_serializer_t* serializer, bundle_pt bundle, hash_map_pt* serializerMap) { + celix_status_t status = CELIX_SUCCESS; + + hash_map_pt map = hashMap_create(NULL, NULL, NULL, NULL); + + if (map != NULL) { + pubsubSerializer_fillMsgSerializerMap(map, bundle); + } else { + logHelper_log(serializer->loghelper, OSGI_LOGSERVICE_ERROR, "Cannot allocate memory for msg map"); + status = CELIX_ENOMEM; + } + + if (status == CELIX_SUCCESS) { + *serializerMap = map; + } + return status; } -celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, pubsub_msg_serializer_map_t* map) { - celix_status_t status = CELIX_SUCCESS; - if (map == NULL) { - return status; - } - - hash_map_iterator_t iter = hashMapIterator_construct(map->serializers); - while (hashMapIterator_hasNext(&iter)) { - pubsub_msg_serializer_t* msgSer = hashMapIterator_nextValue(&iter); - pubsub_msg_serializer_impl_t* impl = msgSer->handle; - dynMessage_destroy(impl->dynMsg); //note msgSer->name and msgSer->version owned by dynType - free(impl); //also contains the service struct. - } - hashMap_destroy(map->serializers, false, false); - free(map); - return status; +celix_status_t pubsubSerializer_destroySerializerMap(pubsub_serializer_t* serializer, hash_map_pt serializerMap) { + celix_status_t status = CELIX_SUCCESS; + if (serializerMap == NULL) { + return CELIX_ILLEGAL_ARGUMENT; + } + + hash_map_iterator_t iter = hashMapIterator_construct(serializerMap); + while (hashMapIterator_hasNext(&iter)) { + pubsub_msg_serializer_t* msgSerializer = hashMapIterator_nextValue(&iter); + dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle; + dynMessage_destroy(dynMsg); //note msgSer->name and msgSer->version owned by dynType + free(msgSerializer); //also contains the service struct. + } + + hashMap_destroy(serializerMap, false, false); + + return status; } -celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_impl_t* impl, const void* msg, char** out, size_t *outLen) { - celix_status_t status = CELIX_SUCCESS; +celix_status_t pubsubMsgSerializer_serialize(pubsub_msg_serializer_t* msgSerializer, const void* msg, void** out, size_t *outLen) { + celix_status_t status = CELIX_SUCCESS; + + char *jsonOutput = NULL; + dyn_type* dynType = NULL; + dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle; + dynMessage_getMessageType(dynMsg, &dynType); - char *jsonOutput = NULL; - dyn_type* dynType = NULL; - dynMessage_getMessageType(impl->dynMsg, &dynType); - int rc = jsonSerializer_serialize(dynType, msg, &jsonOutput); - if (rc != 0){ + if (jsonSerializer_serialize(dynType, msg, &jsonOutput) != 0){ status = CELIX_BUNDLE_EXCEPTION; } - if (status == CELIX_SUCCESS) { - *out = jsonOutput; - *outLen = strlen(jsonOutput) + 1; - } + + if (status == CELIX_SUCCESS) { + *out = jsonOutput; + *outLen = strlen(jsonOutput) + 1; + } return status; } -celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_impl_t* impl, const char* input, size_t inputLen, void **out) { - celix_status_t status = CELIX_SUCCESS; - void *msg = NULL; - dyn_type* dynType = NULL; - dynMessage_getMessageType(impl->dynMsg, &dynType); - int rc = jsonSerializer_deserialize(dynType, input, &msg); - if (rc != 0) { - status = CELIX_BUNDLE_EXCEPTION; - } - else{ - *out = msg; - } +celix_status_t pubsubMsgSerializer_deserialize(pubsub_msg_serializer_t* msgSerializer, const void* input, size_t inputLen, void **out) { + + celix_status_t status = CELIX_SUCCESS; + void *msg = NULL; + dyn_type* dynType = NULL; + dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle; + dynMessage_getMessageType(dynMsg, &dynType); + + if (jsonSerializer_deserialize(dynType, (const char*)input, &msg) != 0) { + status = CELIX_BUNDLE_EXCEPTION; + } + else{ + *out = msg; + } + return status; } -void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_impl_t* impl, void *msg) { - dyn_type* dynType = NULL; - dynMessage_getMessageType(impl->dynMsg, &dynType); - if (dynType != NULL) { - dynType_free(dynType, msg); - } +void pubsubMsgSerializer_freeMsg(pubsub_msg_serializer_t* msgSerializer, void *msg) { + dyn_type* dynType = NULL; + dyn_message_type *dynMsg = (dyn_message_type*)msgSerializer->handle; + dynMessage_getMessageType(dynMsg, &dynType); + if (dynType != NULL) { + dynType_free(dynType, msg); + } } static void pubsubSerializer_fillMsgSerializerMap(hash_map_pt msgSerializers, bundle_pt bundle) { - char* root = NULL; - char* metaInfPath = NULL; + char* root = NULL; + char* metaInfPath = NULL; - root = pubsubSerializer_getMsgDescriptionDir(bundle); + root = pubsubSerializer_getMsgDescriptionDir(bundle); - if(root != NULL){ - asprintf(&metaInfPath, "%s/META-INF/descriptors/messages", root); + if(root != NULL){ + asprintf(&metaInfPath, "%s/META-INF/descriptors", root); - pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers); - pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers); + pubsubSerializer_addMsgSerializerFromBundle(root, bundle, msgSerializers); + pubsubSerializer_addMsgSerializerFromBundle(metaInfPath, bundle, msgSerializers); - free(metaInfPath); - free(root); - } + free(metaInfPath); + free(root); + } } static char* pubsubSerializer_getMsgDescriptionDir(bundle_pt bundle) { - char *root = NULL; + char *root = NULL; - bool isSystemBundle = false; - bundle_isSystemBundle(bundle, &isSystemBundle); + bool isSystemBundle = false; + bundle_isSystemBundle(bundle, &isSystemBundle); - if(isSystemBundle == true) { - bundle_context_pt context; - bundle_getContext(bundle, &context); + if(isSystemBundle == true) { + bundle_context_pt context; + bundle_getContext(bundle, &context); - const char *prop = NULL; + const char *prop = NULL; - bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop); + bundleContext_getProperty(context, SYSTEM_BUNDLE_ARCHIVE_PATH, &prop); - if(prop != NULL) { - root = strdup(prop); - } else { - root = getcwd(NULL, 0); - } - } else { - bundle_getEntry(bundle, ".", &root); - } + if(prop != NULL) { + root = strdup(prop); + } else { + root = getcwd(NULL, 0); + } + } else { + bundle_getEntry(bundle, ".", &root); + } - return root; + return root; } static void pubsubSerializer_addMsgSerializerFromBundle(const char *root, bundle_pt bundle, hash_map_pt msgSerializers) { - char path[128]; - struct dirent *entry = NULL; - DIR *dir = opendir(root); - - if(dir) { - entry = readdir(dir); - } - - while (entry != NULL) { - - if (strstr(entry->d_name, ".descriptor") != NULL) { - - printf("DMU: Parsing entry '%s'\n", entry->d_name); - - memset(path,0,128); - snprintf(path, 128, "%s/%s", root, entry->d_name); - FILE *stream = fopen(path,"r"); - - if (stream != NULL){ - dyn_message_type* msgType = NULL; - - int rc = dynMessage_parse(stream, &msgType); - if (rc == 0 && msgType != NULL) { - - char* msgName = NULL; - rc += dynMessage_getName(msgType,&msgName); - - version_pt msgVersion = NULL; - rc += dynMessage_getVersion(msgType, &msgVersion); - - if(rc == 0 && msgName != NULL && msgVersion != NULL){ - - unsigned int msgId = utils_stringHash(msgName); - - pubsub_msg_serializer_impl_t* impl = calloc(1, sizeof(*impl)); - impl->dynMsg = msgType; - impl->msgSerializer.handle = impl; - impl->msgSerializer.msgId = msgId; - impl->msgSerializer.msgName = msgName; - impl->msgSerializer.msgVersion = msgVersion; - impl->msgSerializer.serialize = (void*) pubsubMsgSerializer_serialize; - impl->msgSerializer.deserialize = (void*) pubsubMsgSerializer_deserialize; - impl->msgSerializer.freeMsg = (void*) pubsubMsgSerializer_freeMsg; - - bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId); - if (clash) { - printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId); - free(impl); - dynMessage_destroy(msgType); - } else if (msgId != 0) { - hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, &impl->msgSerializer); - } else { - printf("Error creating msg serializer\n"); - free(impl); - dynMessage_destroy(msgType); - } - } - else{ - printf("Cannot retrieve name and/or version from msg\n"); - } - - } else{ - printf("DMU: cannot parse message from descriptor %s\n.",path); - } - fclose(stream); - }else{ - printf("DMU: cannot open descriptor file %s\n.",path); - } - - } - entry = readdir(dir); - } - - if(dir) { - closedir(dir); - } + char path[128]; + struct dirent *entry = NULL; + DIR *dir = opendir(root); + + if(dir) { + entry = readdir(dir); + } + + while (entry != NULL) { + + if (strstr(entry->d_name, ".descriptor") != NULL) { + + printf("DMU: Parsing entry '%s'\n", entry->d_name); + + memset(path,0,128); + snprintf(path, 128, "%s/%s", root, entry->d_name); + FILE *stream = fopen(path,"r"); + + if (stream != NULL){ + dyn_message_type* msgType = NULL; + + int rc = dynMessage_parse(stream, &msgType); + if (rc == 0 && msgType != NULL) { + + char* msgName = NULL; + rc += dynMessage_getName(msgType,&msgName); + + version_pt msgVersion = NULL; + rc += dynMessage_getVersion(msgType, &msgVersion); + + if(rc == 0 && msgName != NULL && msgVersion != NULL){ + + unsigned int msgId = utils_stringHash(msgName); + + pubsub_msg_serializer_t *msgSerializer = calloc(1,sizeof(pubsub_msg_serializer_t)); + + msgSerializer->handle = msgType; + msgSerializer->msgId = msgId; + msgSerializer->msgName = msgName; + msgSerializer->msgVersion = msgVersion; + msgSerializer->serialize = (void*) pubsubMsgSerializer_serialize; + msgSerializer->deserialize = (void*) pubsubMsgSerializer_deserialize; + msgSerializer->freeMsg = (void*) pubsubMsgSerializer_freeMsg; + + bool clash = hashMap_containsKey(msgSerializers, (void*)(uintptr_t)msgId); + if (clash){ + printf("Cannot add msg %s. clash in msg id %d!!\n", msgName, msgId); + free(msgSerializer); + dynMessage_destroy(msgType); + } + else if (msgId != 0){ + printf("Adding %u : %s\n", msgId, msgName); + hashMap_put(msgSerializers, (void*)(uintptr_t)msgId, msgSerializer); + } + else{ + printf("Error creating msg serializer\n"); + free(msgSerializer); + dynMessage_destroy(msgType); + } + + } + else{ + printf("Cannot retrieve name and/or version from msg\n"); + } + + } else{ + printf("DMU: cannot parse message from descriptor %s\n.",path); + } + fclose(stream); + }else{ + printf("DMU: cannot open descriptor file %s\n.",path); + } + + } + entry = readdir(dir); + } + + if(dir) { + closedir(dir); + } } http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h index c7cb100..7614e0c 100644 --- a/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h +++ b/pubsub/pubsub_topology_manager/private/include/pubsub_topology_manager.h @@ -41,9 +41,6 @@ struct pubsub_topology_manager { bundle_context_pt context; - celix_thread_mutex_t serializerListLock; - array_list_pt serializerList; - celix_thread_mutex_t psaListLock; array_list_pt psaList; @@ -65,22 +62,14 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager); celix_status_t pubsub_topologyManager_closeImports(pubsub_topology_manager_pt manager); -celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void *handle, service_reference_pt reference, void **service); -celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void *handle, service_reference_pt reference, void *service); -celix_status_t pubsub_topologyManager_pubsubSerializerModified(void *handle, service_reference_pt reference, void *service); -celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void *handle, service_reference_pt reference, void *service); - -celix_status_t pubsub_topologyManager_psaAdding(void *handle, service_reference_pt reference, void **service); celix_status_t pubsub_topologyManager_psaAdded(void *handle, service_reference_pt reference, void *service); celix_status_t pubsub_topologyManager_psaModified(void *handle, service_reference_pt reference, void *service); celix_status_t pubsub_topologyManager_psaRemoved(void *handle, service_reference_pt reference, void *service); -celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service); celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service); celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void* service); celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void* service); -celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service); celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service); celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service); celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pstm_activator.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c index c202e7a..0ce2571 100644 --- a/pubsub/pubsub_topology_manager/private/src/pstm_activator.c +++ b/pubsub/pubsub_topology_manager/private/src/pstm_activator.c @@ -48,7 +48,6 @@ struct activator { pubsub_topology_manager_pt manager; - service_tracker_pt pubsubSerializerTracker; service_tracker_pt pubsubDiscoveryTracker; service_tracker_pt pubsubAdminTracker; service_tracker_pt pubsubSubscribersTracker; @@ -62,38 +61,19 @@ struct activator { log_helper_pt loghelper; }; -static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker); + static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker); static celix_status_t bundleActivator_createPSATracker(struct activator *activator, service_tracker_pt *tracker); static celix_status_t bundleActivator_createPSSubTracker(struct activator *activator, service_tracker_pt *tracker); -static celix_status_t bundleActivator_createPSSTracker(struct activator *activator, service_tracker_pt *tracker) { - celix_status_t status; - - service_tracker_customizer_pt customizer = NULL; - - status = serviceTrackerCustomizer_create(activator->manager, - pubsub_topologyManager_pubsubSerializerAdding, - pubsub_topologyManager_pubsubSerializerAdded, - pubsub_topologyManager_pubsubSerializerModified, - pubsub_topologyManager_pubsubSerializerRemoved, - &customizer); - - if (status == CELIX_SUCCESS) { - status = serviceTracker_create(activator->context, (char *) PUBSUB_SERIALIZER_SERVICE, customizer, tracker); - } - - return status; -} - static celix_status_t bundleActivator_createPSDTracker(struct activator *activator, service_tracker_pt *tracker) { celix_status_t status; service_tracker_customizer_pt customizer = NULL; status = serviceTrackerCustomizer_create(activator->manager, - pubsub_topologyManager_pubsubDiscoveryAdding, + NULL, pubsub_topologyManager_pubsubDiscoveryAdded, pubsub_topologyManager_pubsubDiscoveryModified, pubsub_topologyManager_pubsubDiscoveryRemoved, @@ -112,7 +92,7 @@ static celix_status_t bundleActivator_createPSATracker(struct activator *activat service_tracker_customizer_pt customizer = NULL; status = serviceTrackerCustomizer_create(activator->manager, - pubsub_topologyManager_psaAdding, + NULL, pubsub_topologyManager_psaAdded, pubsub_topologyManager_psaModified, pubsub_topologyManager_psaRemoved, @@ -131,7 +111,7 @@ static celix_status_t bundleActivator_createPSSubTracker(struct activator *activ service_tracker_customizer_pt customizer = NULL; status = serviceTrackerCustomizer_create(activator->manager, - pubsub_topologyManager_subscriberAdding, + NULL, pubsub_topologyManager_subscriberAdded, pubsub_topologyManager_subscriberModified, pubsub_topologyManager_subscriberRemoved, @@ -163,36 +143,15 @@ celix_status_t bundleActivator_create(bundle_context_pt context, void **userData if (status == CELIX_SUCCESS) { status = bundleActivator_createPSDTracker(activator, &activator->pubsubDiscoveryTracker); if (status == CELIX_SUCCESS) { - status = bundleActivator_createPSSTracker(activator, &activator->pubsubSerializerTracker); - if (status == CELIX_SUCCESS){ - status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker); + status = bundleActivator_createPSATracker(activator, &activator->pubsubAdminTracker); + if (status == CELIX_SUCCESS) { + status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker); if (status == CELIX_SUCCESS) { - status = bundleActivator_createPSSubTracker(activator, &activator->pubsubSubscribersTracker); - if (status == CELIX_SUCCESS) { - *userData = activator; - } - if (status != CELIX_SUCCESS){ - serviceTracker_destroy(activator->pubsubAdminTracker); - } - } - if (status != CELIX_SUCCESS){ - serviceTracker_destroy(activator->pubsubSerializerTracker); + *userData = activator; } } - if (status != CELIX_SUCCESS){ - serviceTracker_destroy(activator->pubsubDiscoveryTracker); - } - } - if (status != CELIX_SUCCESS){ - pubsub_topologyManager_destroy(activator->manager); } } - if (status != CELIX_SUCCESS){ // an exception occurred so free allocated memory - logHelper_stop(activator->loghelper); - logHelper_destroy(&activator->loghelper); - free(activator); - - } return status; } @@ -224,12 +183,13 @@ celix_status_t bundleActivator_start(void * userData, bundle_context_pt context) properties_set(props, (char *) OSGI_RSA_SERVICE_EXPORTED_INTERFACES, (char *) PUBSUB_TOPIC_INFO_SERVICE); status += bundleContext_registerService(context, (char *) PUBSUB_TOPIC_INFO_SERVICE, activator->topicInfo, props, &activator->topicInfoService); */ - - status += serviceTracker_open(activator->pubsubSerializerTracker); status += serviceTracker_open(activator->pubsubAdminTracker); + status += serviceTracker_open(activator->pubsubDiscoveryTracker); + status += serviceTracker_open(activator->pubsubSubscribersTracker); + return status; } @@ -239,7 +199,6 @@ celix_status_t bundleActivator_stop(void * userData, bundle_context_pt context) serviceTracker_close(activator->pubsubSubscribersTracker); serviceTracker_close(activator->pubsubDiscoveryTracker); - serviceTracker_close(activator->pubsubSerializerTracker); serviceTracker_close(activator->pubsubAdminTracker); serviceRegistration_unregister(activator->publisherEPDiscoverService); @@ -261,7 +220,6 @@ celix_status_t bundleActivator_destroy(void * userData, bundle_context_pt contex serviceTracker_destroy(activator->pubsubSubscribersTracker); serviceTracker_destroy(activator->pubsubDiscoveryTracker); - serviceTracker_destroy(activator->pubsubSerializerTracker); serviceTracker_destroy(activator->pubsubAdminTracker); logHelper_stop(activator->loghelper); http://git-wip-us.apache.org/repos/asf/celix/blob/3b99cc34/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --git a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c index 0e7923b..b4e8f46 100644 --- a/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c +++ b/pubsub/pubsub_topology_manager/private/src/pubsub_topology_manager.c @@ -23,7 +23,6 @@ * \author <a href="mailto:[email protected]">Apache Celix Project Team</a> * \copyright Apache License, Version 2.0 */ - #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -66,15 +65,13 @@ celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_help celixThreadMutexAttr_create(&psaAttr); celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); - celixThreadMutexAttr_destroy(&psaAttr); + celixThreadMutexAttr_destroy(&psaAttr); - status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL); + status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL); status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); - status = celixThreadMutex_create(&(*manager)->serializerListLock, NULL); arrayList_create(&(*manager)->psaList); - arrayList_create(&(*manager)->serializerList); (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); (*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); @@ -98,11 +95,6 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager celixThreadMutex_unlock(&manager->psaListLock); celixThreadMutex_destroy(&manager->psaListLock); - celixThreadMutex_lock(&manager->serializerListLock); - arrayList_destroy(manager->serializerList); - celixThreadMutex_unlock(&manager->serializerListLock); - celixThreadMutex_destroy(&manager->serializerListLock); - celixThreadMutex_lock(&manager->publicationsLock); hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); while(hashMapIterator_hasNext(pubit)){ @@ -138,24 +130,18 @@ celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager return status; } - -celix_status_t pubsub_topologyManager_psaAdding(void * handle, service_reference_pt reference, void **service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - status = bundleContext_getService(manager->context, reference, service); - - return status; -} - celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = handle; - int i, j; + int i; - pubsub_admin_service_pt new_psa = (pubsub_admin_service_pt) service; + pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA"); + celixThreadMutex_lock(&manager->psaListLock); + arrayList_add(manager->psaList, psa); + celixThreadMutex_unlock(&manager->psaListLock); + // Add already detected subscriptions to new PSA celixThreadMutex_lock(&manager->subscriptionsLock); hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions); @@ -163,27 +149,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ while (hashMapIterator_hasNext(subscriptionsIterator)) { array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator); for(i=0;i<arrayList_size(sub_ep_list);i++){ - pubsub_endpoint_pt sub = (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i); - double new_psa_score; - new_psa->matchSubscriber(new_psa->admin, sub, &new_psa_score); - pubsub_admin_service_pt best_psa = NULL; - double highest_score = 0; - - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - double score; - psa->matchSubscriber(psa->admin, sub, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; - } - } - if (best_psa != NULL && (new_psa_score > highest_score)){ - best_psa->removeSubscription(best_psa->admin, sub); - } - if (new_psa_score > highest_score){ - status += new_psa->addSubscription(new_psa->admin, sub); - } + status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i)); } } @@ -198,27 +164,7 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ while (hashMapIterator_hasNext(publicationsIterator)) { array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator); for(i=0;i<arrayList_size(pub_ep_list);i++){ - pubsub_endpoint_pt pub = (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i); - double new_psa_score; - new_psa->matchPublisher(new_psa->admin, pub, &new_psa_score); - pubsub_admin_service_pt best_psa = NULL; - double highest_score = 0; - - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - double score; - psa->matchPublisher(psa->admin, pub, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; - } - } - if (best_psa != NULL && (new_psa_score > highest_score)){ - best_psa->removePublication(best_psa->admin, pub); - } - if (new_psa_score > highest_score){ - status += new_psa->addPublication(new_psa->admin, pub); - } + status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i)); } } @@ -226,20 +172,6 @@ celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_ celixThreadMutex_unlock(&manager->publicationsLock); - - celixThreadMutex_lock(&manager->serializerListLock); - unsigned int size = arrayList_size(manager->serializerList); - if (size > 0) { - pubsub_serializer_service_t* ser = arrayList_get(manager->serializerList, (size-1)); //last, same as result of add/remove serializer - new_psa->setSerializer(new_psa->admin, ser); - } - celixThreadMutex_unlock(&manager->serializerListLock); - - celixThreadMutex_lock(&manager->psaListLock); - arrayList_add(manager->psaList, new_psa); - celixThreadMutex_unlock(&manager->psaListLock); - - return status; } @@ -325,97 +257,13 @@ celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_referenc return status; } -celix_status_t pubsub_topologyManager_pubsubSerializerAdding(void* handle, service_reference_pt reference, void** service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - bundleContext_getService(manager->context, reference, service); - - return status; -} - -celix_status_t pubsub_topologyManager_pubsubSerializerAdded(void* handle, service_reference_pt reference, void* service) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_topology_manager_pt manager = handle; - pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service; - - celixThreadMutex_lock(&manager->serializerListLock); - - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added pubsub serializer"); - - int i; - for(i=0; i<arrayList_size(manager->psaList); i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i); - psa->setSerializer(psa->admin, new_serializer); - } - - arrayList_add(manager->serializerList, new_serializer); - - celixThreadMutex_unlock(&manager->serializerListLock); - - return status; -} - -celix_status_t pubsub_topologyManager_pubsubSerializerModified(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - // Nop... - - return status; -} - -celix_status_t pubsub_topologyManager_pubsubSerializerRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; - - pubsub_topology_manager_pt manager = handle; - pubsub_serializer_service_t* new_serializer = (pubsub_serializer_service_t*) service; - - celixThreadMutex_lock(&manager->serializerListLock); - - logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed pubsub serializer"); - - int i, j; - - for(i=0; i<arrayList_size(manager->psaList); i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,i); - psa->removeSerializer(psa->admin, new_serializer); - } - - arrayList_removeElement(manager->serializerList, new_serializer); - - if (arrayList_size(manager->serializerList) > 0){ - //there is another serializer available, change the admin so it is using another serializer - pubsub_serializer_service_t* replacing_serializer = (pubsub_serializer_service_t*) arrayList_get(manager->serializerList,0); - - for(j=0; j<arrayList_size(manager->psaList); j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt) arrayList_get(manager->psaList,j); - psa->setSerializer(psa->admin, replacing_serializer); - } - } - - celixThreadMutex_unlock(&manager->serializerListLock); - - - return status; -} - -celix_status_t pubsub_topologyManager_subscriberAdding(void * handle, service_reference_pt reference, void **service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - status = bundleContext_getService(manager->context, reference, service); - - return status; -} - celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) { celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = handle; //subscriber_service_pt subscriber = (subscriber_service_pt)service; pubsub_endpoint_pt sub = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&sub) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){ celixThreadMutex_lock(&manager->subscriptionsLock); char *sub_key = createScopeTopicKey(sub->scope, sub->topic); @@ -430,40 +278,40 @@ celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_ref celixThreadMutex_unlock(&manager->subscriptionsLock); int j; - celixThreadMutex_lock(&manager->psaListLock); - double highest_score = -1; + double score = 0; + double best_score = 0; pubsub_admin_service_pt best_psa = NULL; - + celixThreadMutex_lock(&manager->psaListLock); for(j=0;j<arrayList_size(manager->psaList);j++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - double score; - psa->matchSubscriber(psa->admin, sub, &score); - if (score > highest_score){ - highest_score = score; + psa->matchEndpoint(psa->admin,sub,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; best_psa = psa; } } - if (best_psa != NULL){ + + if(best_psa != NULL && best_score>0){ best_psa->addSubscription(best_psa->admin,sub); } // Inform discoveries for interest in the topic - celixThreadMutex_lock(&manager->discoveryListLock); + celixThreadMutex_lock(&manager->discoveryListLock); hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->interestedInTopic(disc->handle, sub->scope, sub->topic); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->interestedInTopic(disc->handle, sub->scope, sub->topic); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); celixThreadMutex_unlock(&manager->psaListLock); } else{ - status = CELIX_INVALID_BUNDLE_CONTEXT; + status=CELIX_INVALID_BUNDLE_CONTEXT; } return status; @@ -482,25 +330,25 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r pubsub_topology_manager_pt manager = handle; pubsub_endpoint_pt subcmp = NULL; - if(pubsubEndpoint_createFromServiceReference(reference,&subcmp) == CELIX_SUCCESS){ + if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){ int j,k; // Inform discoveries that we not interested in the topic any more - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - - celixThreadMutex_lock(&manager->subscriptionsLock); - celixThreadMutex_lock(&manager->psaListLock); + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + + celixThreadMutex_lock(&manager->subscriptionsLock); + celixThreadMutex_lock(&manager->psaListLock); char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic); array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); @@ -509,19 +357,10 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r for(j=0;j<arrayList_size(sub_list_by_topic);j++){ pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j); if(pubsubEndpoint_equals(sub,subcmp)){ - double highest_score = -1; - pubsub_admin_service_pt best_psa = NULL; for(k=0;k<arrayList_size(manager->psaList);k++){ + /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - double score; - psa->matchSubscriber(psa->admin, sub, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; - } - } - if (best_psa != NULL){ - best_psa->removeSubscription(best_psa->admin,sub); + psa->removeSubscription(psa->admin,sub); } } @@ -547,22 +386,13 @@ celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_r } else{ - status = CELIX_INVALID_BUNDLE_CONTEXT; + status=CELIX_INVALID_BUNDLE_CONTEXT; } return status; } -celix_status_t pubsub_topologyManager_pubsubDiscoveryAdding(void* handle, service_reference_pt reference, void** service) { - celix_status_t status = CELIX_SUCCESS; - pubsub_topology_manager_pt manager = handle; - - bundleContext_getService(manager->context, reference, service); - - return status; -} - celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) { celix_status_t status = CELIX_SUCCESS; pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle; @@ -600,16 +430,16 @@ celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service iter = hashMapIterator_create(manager->subscriptions); while(hashMapIterator_hasNext(iter)) { - array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); - int i; - for(i=0;i<arrayList_size(l);i++){ - pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); + array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); + int i; + for(i=0;i<arrayList_size(l);i++){ + pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); - disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic); - } + disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic); + } } hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->subscriptionsLock); + celixThreadMutex_unlock(&manager->subscriptionsLock); return status; } @@ -654,81 +484,57 @@ celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_ listener_hook_info_pt info = arrayList_get(listeners, l_index); - const char* fwUUID=NULL; - bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + pubsub_endpoint_pt pub = NULL; + if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){ - char* scope = pubsub_getScopeFromFilter(info->filter); - char* topic = pubsub_getTopicFromFilter(info->filter); - if(scope == NULL) { - scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); - } - - //TODO: Can we use a better serviceID?? - bundle_pt bundle = NULL; - long bundleId = -1; - bundleContext_getBundle(info->context,&bundle); - bundle_getBundleId(bundle,&bundleId); + celixThreadMutex_lock(&manager->publicationsLock); + char *pub_key = createScopeTopicKey(pub->scope, pub->topic); + array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); + if(pub_list_by_topic==NULL){ + arrayList_create(&pub_list_by_topic); + hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); + } + free(pub_key); + arrayList_add(pub_list_by_topic,pub); - if(fwUUID !=NULL && topic !=NULL){ + celixThreadMutex_unlock(&manager->publicationsLock); - pubsub_endpoint_pt pub = NULL; - if(pubsubEndpoint_create(fwUUID, scope, topic,bundleId,NULL,&pub) == CELIX_SUCCESS){ + int j; + double score = 0; + double best_score = 0; + pubsub_admin_service_pt best_psa = NULL; + celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(scope, topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); - if(pub_list_by_topic==NULL){ - arrayList_create(&pub_list_by_topic); - hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); + for(j=0;j<arrayList_size(manager->psaList);j++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); + psa->matchEndpoint(psa->admin,pub,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; + best_psa = psa; } - free(pub_key); - arrayList_add(pub_list_by_topic,pub); - - celixThreadMutex_unlock(&manager->publicationsLock); - - int j; - celixThreadMutex_lock(&manager->psaListLock); - - double highest_score = -1; - pubsub_admin_service_pt best_psa = NULL; + } - for(j=0;j<arrayList_size(manager->psaList);j++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); - double score; - psa->matchPublisher(psa->admin, pub, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; + if(best_psa != NULL && best_score>0){ + status = best_psa->addPublication(best_psa->admin,pub); + if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->announcePublisher(disc->handle,pub); + bundleContext_ungetService(manager->context, disc_sr, NULL); } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); } - if (best_psa != NULL){ - status = best_psa->addPublication(best_psa->admin,pub); - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->announcePublisher(disc->handle,pub); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } - } - - celixThreadMutex_unlock(&manager->psaListLock); - } - } - else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; + celixThreadMutex_unlock(&manager->psaListLock); + } - free(topic); - free(scope); } return status; @@ -746,62 +552,41 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra listener_hook_info_pt info = arrayList_get(listeners, l_index); - char* pub_scope = pubsub_getScopeFromFilter(info->filter); - char* pub_topic = pubsub_getTopicFromFilter(info->filter); - - const char* fwUUID=NULL; - bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); - - //TODO: Can we use a better serviceID?? - bundle_pt bundle = NULL; - long bundleId = -1; - bundleContext_getBundle(info->context,&bundle); - bundle_getBundleId(bundle,&bundleId); - - if(bundle !=NULL && pub_topic !=NULL && bundleId>0){ - - pubsub_endpoint_pt pubcmp = NULL; - if(pubsubEndpoint_create(fwUUID, pub_scope, pub_topic,bundleId,NULL,&pubcmp) == CELIX_SUCCESS){ - - int j,k; - celixThreadMutex_lock(&manager->psaListLock); - celixThreadMutex_lock(&manager->publicationsLock); - - char *pub_key = createScopeTopicKey(pub_scope, pub_topic); - array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); - if(pub_list_by_topic!=NULL){ - for(j=0;j<arrayList_size(pub_list_by_topic);j++){ - pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j); - if(pubsubEndpoint_equals(pub,pubcmp)){ - double highest_score = -1; - pubsub_admin_service_pt best_psa = NULL; - - for(k=0;k<arrayList_size(manager->psaList);k++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - double score; - psa->matchPublisher(psa->admin, pub, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; + pubsub_endpoint_pt pubcmp = NULL; + if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){ + + + int j,k; + celixThreadMutex_lock(&manager->psaListLock); + celixThreadMutex_lock(&manager->publicationsLock); + + char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic); + array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); + if(pub_list_by_topic!=NULL){ + for(j=0;j<arrayList_size(pub_list_by_topic);j++){ + pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j); + if(pubsubEndpoint_equals(pub,pubcmp)){ + for(k=0;k<arrayList_size(manager->psaList);k++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); + status = psa->removePublication(psa->admin,pub); + if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */ + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->removePublisher(disc->handle,pub); + bundleContext_ungetService(manager->context, disc_sr, NULL); } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); } - if (best_psa != NULL){ - status = best_psa->removePublication(best_psa->admin,pub); - if(status==CELIX_SUCCESS){ - celixThreadMutex_lock(&manager->discoveryListLock); - hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); - while(hashMapIterator_hasNext(iter)){ - service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); - publisher_endpoint_announce_pt disc = NULL; - bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->removePublisher(disc->handle,pub); - bundleContext_ungetService(manager->context, disc_sr, NULL); - } - hashMapIterator_destroy(iter); - celixThreadMutex_unlock(&manager->discoveryListLock); - } + else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */ + status = CELIX_SUCCESS; } } + //} arrayList_remove(pub_list_by_topic,j); /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */ @@ -813,26 +598,20 @@ celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, arra } pubsubEndpoint_destroy(pub); - } - } - celixThreadMutex_unlock(&manager->publicationsLock); - celixThreadMutex_unlock(&manager->psaListLock); + } + } - pubsubEndpoint_destroy(pubcmp); + celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_unlock(&manager->psaListLock); - free(pub_key); + free(pub_key); - } + pubsubEndpoint_destroy(pubcmp); } - else{ - status=CELIX_INVALID_BUNDLE_CONTEXT; - } - free(pub_scope); - free(pub_topic); } return status; @@ -846,8 +625,6 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end celixThreadMutex_lock(&manager->psaListLock); celixThreadMutex_lock(&manager->publicationsLock); - int i; - char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic); array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); @@ -859,23 +636,28 @@ celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_end /* Shouldn't be any other duplicate, since it's filtered out by the discovery */ pubsub_endpoint_pt p = NULL; - pubsubEndpoint_create(pubEP->frameworkUUID,pubEP->scope,pubEP->topic,pubEP->serviceID,pubEP->endpoint,&p); + pubsubEndpoint_clone(pubEP, &p); arrayList_add(pub_list_by_topic,p); - double highest_score = -1; + int j; + double score = 0; + double best_score = 0; pubsub_admin_service_pt best_psa = NULL; - for(i=0;i<arrayList_size(manager->psaList);i++){ - pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - double score; - psa->matchPublisher(psa->admin, p, &score); - if (score > highest_score){ - highest_score = score; + for(j=0;j<arrayList_size(manager->psaList);j++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); + psa->matchEndpoint(psa->admin,p,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; best_psa = psa; } } - if (best_psa != NULL){ - status += best_psa->addPublication(best_psa->admin,p); + + if(best_psa != NULL && best_score>0){ + best_psa->addPublication(best_psa->admin,p); + } + else{ + status = CELIX_ILLEGAL_STATE; } celixThreadMutex_unlock(&manager->publicationsLock); @@ -911,20 +693,10 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo if(found && p !=NULL){ - double highest_score = -1; - pubsub_admin_service_pt best_psa = NULL; - for(i=0;i<arrayList_size(manager->psaList);i++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - double score; - psa->matchPublisher(psa->admin, p, &score); - if (score > highest_score){ - highest_score = score; - best_psa = psa; - } - } - if (best_psa != NULL){ - status += best_psa->removePublication(best_psa->admin,p); + /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ + psa->removePublication(psa->admin,p); } arrayList_removeElement(pub_list_by_topic,p); @@ -934,7 +706,7 @@ celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpo for(i=0;i<arrayList_size(manager->psaList);i++){ pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - status += psa->closeAllPublications(psa->admin,p->scope, p->topic); + psa->closeAllPublications(psa->admin,p->scope, p->topic); } }
